Using an SDK to consume data from Log Service in real time can be challenging because it requires you to manage specific implementation details, load balancing, and failover between consumers. To simplify this, you can use a consumer group, which lets you consume data in near real-time, typically within seconds. This topic explains how.
Overview
A Logstore contains multiple shards. A consumer group consumes data by assigning these shards to its consumers based on the following rules:
When a new consumer joins a consumer group, it rebalances the shards among all consumers to ensure load balancing. The same rules apply.
Key concepts
|
Term
|
Description
|
|
consumer group
|
Log Service supports data consumption through consumer groups. A consumer group consists of multiple consumers that jointly consume data from the same Logstore without duplication.
Important
You can create up to 30 consumer groups for each Logstore.
|
|
consumer
|
The basic unit of a consumer group. A consumer performs the actual consumption task.
Important
Consumers in the same consumer group must have unique names.
|
|
Logstore
|
A unit for collecting, storing, and querying data. For more information, see Logstore.
|
|
shard
|
A unit that controls the read and write capacity of a Logstore. Data is always stored in a shard. For more information, see shard.
|
|
checkpoint
|
The position in a data stream marking the latest data a consumer has processed. This allows the consumer to resume processing from that point after a restart.
Note
When you consume data by using a consumer group, checkpoints are automatically saved if the program fails. After the program recovers, it can continue consuming data from the last checkpoint. This prevents duplicate consumption.
|
Step 1: Create a consumer group
You can create a consumer group using the SDK, API, or CLI.
SDK
The following code shows how to create a consumer group:
CreateConsumerGroup.java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// This example obtains the AccessKey ID and AccessKey secret from environment variables.
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Enter the project name.
String projectName = "ali-test-project";
// Enter the Logstore name.
String logstoreName = "ali-test-logstore";
// Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
String host = "https://cn-hangzhou.log.aliyuncs.com";
// Create a Simple Log Service client.
Client client = new Client(host, accessId, accessKey);
try {
// Set the consumer group name.
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
For sample code on managing consumer groups, see Use the Java SDK to manage consumer groups and Use Simple Log Service SDK for Python to manage consumer groups.
Step 2: Consume log data
How it works
When a consumer using the consumer group SDK starts for the first time, the SDK creates a consumer group if it does not already exist. The start checkpoint defines the initial consumption position, used only when the consumer group is first created. On subsequent restarts, the consumer resumes from the last checkpoint the server saved. For example:
-
LogHubConfig.ConsumePosition.BEGIN_CURSOR: The consumer group starts consuming from the first log in the Logstore.
-
LogHubConfig.ConsumePosition.END_CURSOR: The consumer group starts consuming after the last log in the Logstore.
Examples
You can use the Java, C++, Python, and Go SDKs to consume data using a consumer group. The following examples use the Java SDK.
Example 1: SDK consumption
-
Add Maven dependencies.
In your pom.xml file, add the following dependencies:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
Create a class to define your log processing logic.
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Tracks the last time a checkpoint was saved.
private long mLastSaveTime = 0;
// Called once upon processor initialization.
public void initialize(int shardId) {
this.shardId = shardId;
}
// The main logic for consuming data. All exceptions must be handled within this method. Do not throw exceptions directly.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Print the fetched data.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Save a checkpoint to the server every 30 seconds. If the worker terminates unexpectedly, the new worker resumes from the last checkpoint, potentially reprocessing a small amount of data.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// A 'true' parameter flushes the checkpoint to the server immediately. By default, the in-memory checkpoint is automatically flushed to the server every 60 seconds.
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// A 'false' parameter caches the checkpoint locally. It will be flushed to the server by the auto-update mechanism.
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
// This method is called when the worker is shutting down. You can perform cleanup tasks here.
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// Save the checkpoint to the server immediately.
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
For more code examples, see the aliyun-log-consumer-java and Aliyun LOG Go Consumer repositories.
-
Create a factory to generate instances of your log processor.
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Generate a consumer instance. Note: Each call to the generatorProcessor method should return a new SampleLogHubProcessor object.
return new SampleLogHubProcessor();
}
}
-
Create a main class to configure and start the worker thread.
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// The endpoint of Simple Log Service. Replace with your actual endpoint.
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// The project name. Replace with the name of an existing project.
private static String Project = "ali-test-project";
// The Logstore name. Replace with the name of an existing Logstore.
private static String Logstore = "ali-test-logstore";
// You do not need to create the consumer group beforehand; the program automatically creates it on the first run.
private static String ConsumerGroup = "ali-test-consumergroup2";
// This example retrieves the AccessKey ID and AccessKey Secret from environment variables.
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// "consumer_1" is the consumer name, which must be unique within a consumer group. To balance consumption across multiple machines, you can use unique identifiers like the machine's IP address as the consumer name.
// maxFetchLogGroupSize sets the maximum number of LogGroups to fetch per request. The default value is usually sufficient. You can adjust it by using config.setMaxFetchLogGroupSize(100). The valid range is (0, 1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// After the thread starts, the ClientWorker runs automatically. ClientWorker implements the Runnable interface.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Call the worker's shutdown() method to stop the consumer instance and its associated thread.
worker.shutdown();
// The ClientWorker creates asynchronous tasks. After shutdown(), wait for these tasks to complete gracefully. A 30-second sleep is recommended.
Thread.sleep(30 * 1000);
}
}
-
Run Main.java.
The following is an example of the output from consuming NGINX logs:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Example 2: SDK consumption with SPL
-
Add Maven dependencies.
In your pom.xml file, add the following dependencies:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
Create a class to define your log processing logic.
SPLLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Tracks the last time a checkpoint was saved.
private long mLastSaveTime = 0;
// Called once upon processor initialization.
public void initialize(int shardId) {
this.shardId = shardId;
}
// The main logic for consuming data. All exceptions must be handled within this method. Do not throw exceptions directly.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Print the fetched data.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Save a checkpoint to the server every 30 seconds. If the worker terminates unexpectedly, the new worker resumes from the last checkpoint, potentially reprocessing a small amount of data.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// A 'true' parameter flushes the checkpoint to the server immediately. By default, the in-memory checkpoint is automatically flushed to the server every 60 seconds.
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// A 'false' parameter caches the checkpoint locally. It will be flushed to the server by the auto-update mechanism.
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
// This method is called when the worker is shutting down. You can perform cleanup tasks here.
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// Save the checkpoint to the server immediately.
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
-
Create a factory to generate instances of your log processor.
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Generate a consumer instance. Note: Each call to the generatorProcessor method should return a new SPLLogHubProcessor object.
return new SPLLogHubProcessor();
}
}
-
Create a main class to configure and start the worker thread.
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// The endpoint of Simple Log Service. Replace with your actual endpoint.
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// The project name. Replace with the name of an existing project.
private static String Project = "ali-test-project";
// The Logstore name. Replace with the name of an existing Logstore.
private static String Logstore = "ali-test-logstore";
// You do not need to create the consumer group beforehand; the program automatically creates it on the first run.
private static String ConsumerGroup = "ali-test-consumergroup2";
// This example retrieves the AccessKey ID and AccessKey Secret from environment variables.
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// "consumer_1" is the consumer name, which must be unique within a consumer group. To balance consumption across multiple machines, you can use unique identifiers like the machine's IP address as the consumer name.
// maxFetchLogGroupSize sets the maximum number of LogGroups to fetch per request. The default value is usually sufficient. You can adjust it by using config.setMaxFetchLogGroupSize(100). The valid range is (0, 1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
// Use setQuery to specify an SPL statement to filter logs during consumption.
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// After the thread starts, the ClientWorker runs automatically. ClientWorker implements the Runnable interface.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Call the worker's shutdown() method to stop the consumer instance and its associated thread.
worker.shutdown();
// The ClientWorker creates asynchronous tasks. After shutdown(), wait for these tasks to complete gracefully. A 30-second sleep is recommended.
Thread.sleep(30 * 1000);
}
}
-
Run Main.java.
The following is an example of the output from consuming NGINX logs:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Step 3: View consumer group status
You can view the status of a consumer group using one of the following methods:
Java SDK
-
View the consumption checkpoint for each shard. The following code provides an example:
ConsumerGroupTest.java
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Get all consumer groups in the Logstore. The list is empty if no consumer groups exist.
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// Print the consumer group properties: name, heartbeat timeout, and ordered consumption status.
System.out.println("Name: " + c.getConsumerGroupName());
System.out.println("heartbeat timeout: " + c.getTimeout());
System.out.println("ordered consumption: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// The time is a long integer, accurate to the microsecond.
System.out.println("Last checkpoint update time: " + cp.getUpdateTime());
System.out.println("Consumer name: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Consumption has not started";
else{
// A UNIX timestamp in seconds. Format the output as needed.
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "Invalid. The consumption checkpoint is older than the data retention period.";
else{
// internal server error
throw e;
}
}
}
System.out.println("consumption checkpoint: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// do nothing
}
// A UNIX timestamp in seconds. Format the output as needed.
System.out.println("Arrival time of the last record: " + endPrg);
}
}
}
}
-
The following is the sample output:
Name: ali-test-consumergroup2
heartbeat timeout: 60
ordered consumption: false
shard: 0
Last checkpoint update time: 0
Consumer name: consumer_1
consumption checkpoint: Consumption has not started
Arrival time of the last record: 1729583617
shard: 1
Last checkpoint update time: 0
Consumer name: consumer_1
consumption checkpoint: Consumption has not started
Arrival time of the last record: 1729583738
Process finished with exit code 0
Console
-
Log in to the Simple Log Service console.
In the Projects section, click the one you want.

-
On the tab, click the
icon to the left of the target Logstore, then click the
icon to the left of Data Consumption.
-
In the consumer group list, click the target consumer group.
-
On the Consumer Group Status page, view the consumption checkpoint for each shard. This page shows details for each shard, including its ID (shard), Last Consumed Time, and the consuming Client. You can also use the Refresh and Reset Checkpoint buttons.
Related operations
-
RAM user authorization
To use a RAM user to manage consumer groups, you must grant the required permissions to the RAM user. For more information, see Create and authorize a RAM user.
The following table describes the required Actions.
|
Action
|
Description
|
Resource
|
|
log:GetCursorOrData(GetCursor)
|
Gets a cursor based on a specified time.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}
|
|
log:CreateConsumerGroup(CreateConsumerGroup)
|
Creates a consumer group in a specified Logstore.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ListConsumerGroup(ListConsumerGroup)
|
Lists all consumer groups in a specified Logstore.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/*
|
|
log:ConsumerGroupUpdateCheckPoint(UpdateCheckPoint)
|
Updates the checkpoint on a shard for a specified consumer group.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat)
|
Sends a heartbeat from a specified consumer to the server.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:UpdateConsumerGroup(UpdateConsumerGroup)
|
Modifies the properties of a specified consumer group.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:GetConsumerGroupCheckPoint(GetConsumerGroupCheckPoint)
|
Gets the checkpoint of one or all shards for a specified consumer group.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
To grant a RAM user permissions for a consumer group with the properties listed below, use the following sample policy.
-
Alibaba Cloud account ID: 174649****602745
-
Region ID: cn-hangzhou
-
Project name: project-test
-
Logstore name: logstore-test
-
Consumer group name: consumergroup-test
Sample policy:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
-
Troubleshooting
For easier troubleshooting, configure Log4j for your consumer application to log exceptions from the consumer group. The following is a sample log4j.properties configuration:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
After you configure Log4j, the consumer application outputs exception information similar to the following:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
-
Consume data from a specified time
// consumerStartTimeInSeconds indicates the time from which to start consuming data.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
// position is an enumeration. LogHubConfig.ConsumePosition.BEGIN_CURSOR starts consumption from the oldest data. LogHubConfig.ConsumePosition.END_CURSOR starts consumption from the latest data.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
Note
-
Choose a constructor based on your requirements.
-
If a checkpoint is already saved on the server, consumption resumes from that checkpoint.
-
Simple Log Service prioritizes a saved checkpoint for consumption. If you specify a start time, ensure the consumerStartTimeInSeconds value falls within the data retention period (TTL). Otherwise, the specified start time is ignored.
-
Reset a checkpoint
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
// The timestamp must be a UNIX timestamp in seconds. If your timestamp is in milliseconds, divide it by 1000.
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}
}
References
-
API
-
SDK
|
Language
|
References
|
|
Java
|
|
|
Python
|
|
-
CLI