Consuming logs with a consumer group

更新时间:
复制 MD 格式

Using an SDK to consume data from Log Service in real time can be challenging because it requires you to manage specific implementation details, load balancing, and failover between consumers. To simplify this, you can use a consumer group, which lets you consume data in near real-time, typically within seconds. This topic explains how.

Overview

A Logstore contains multiple shards. A consumer group consumes data by assigning these shards to its consumers based on the following rules:

  • A shard is assigned to only one consumer at a time.

  • A single consumer can be assigned multiple shards.

When a new consumer joins a consumer group, it rebalances the shards among all consumers to ensure load balancing. The same rules apply.

image

Key concepts

Term

Description

consumer group

Log Service supports data consumption through consumer groups. A consumer group consists of multiple consumers that jointly consume data from the same Logstore without duplication.

Important

You can create up to 30 consumer groups for each Logstore.

consumer

The basic unit of a consumer group. A consumer performs the actual consumption task.

Important

Consumers in the same consumer group must have unique names.

Logstore

A unit for collecting, storing, and querying data. For more information, see Logstore.

shard

A unit that controls the read and write capacity of a Logstore. Data is always stored in a shard. For more information, see shard.

checkpoint

The position in a data stream marking the latest data a consumer has processed. This allows the consumer to resume processing from that point after a restart.

Note

When you consume data by using a consumer group, checkpoints are automatically saved if the program fails. After the program recovers, it can continue consuming data from the last checkpoint. This prevents duplicate consumption.

Prerequisites

Step 1: Create a consumer group

You can create a consumer group using the SDK, API, or CLI.

SDK

The following code shows how to create a consumer group:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            // Set the consumer group name.
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

For sample code on managing consumer groups, see Use the Java SDK to manage consumer groups and Use Simple Log Service SDK for Python to manage consumer groups.

API

To create a consumer group using the API, see CreateConsumerGroup.

To verify that the consumer group was created, see ListConsumerGroup.

CLI

To create a consumer group using the CLI, see create_consumer_group.

To verify that the consumer group was created, see list_consumer_group.

Step 2: Consume log data

How it works

When a consumer using the consumer group SDK starts for the first time, the SDK creates a consumer group if it does not already exist. The start checkpoint defines the initial consumption position, used only when the consumer group is first created. On subsequent restarts, the consumer resumes from the last checkpoint the server saved. For example:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: The consumer group starts consuming from the first log in the Logstore.

  • LogHubConfig.ConsumePosition.END_CURSOR: The consumer group starts consuming after the last log in the Logstore.

Examples

You can use the Java, C++, Python, and Go SDKs to consume data using a consumer group. The following examples use the Java SDK.

Example 1: SDK consumption

  1. Add Maven dependencies.

    In your pom.xml file, add the following dependencies:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. Create a class to define your log processing logic.

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import java.util.List;
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Tracks the last time a checkpoint was saved.
        private long mLastSaveTime = 0;
        // Called once upon processor initialization.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
        // The main logic for consuming data. All exceptions must be handled within this method. Do not throw exceptions directly.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Print the fetched data.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Save a checkpoint to the server every 30 seconds. If the worker terminates unexpectedly, the new worker resumes from the last checkpoint, potentially reprocessing a small amount of data.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // A 'true' parameter flushes the checkpoint to the server immediately. By default, the in-memory checkpoint is automatically flushed to the server every 60 seconds.
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // A 'false' parameter caches the checkpoint locally. It will be flushed to the server by the auto-update mechanism.
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
        // This method is called when the worker is shutting down. You can perform cleanup tasks here.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Save the checkpoint to the server immediately.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }

    For more code examples, see the aliyun-log-consumer-java and Aliyun LOG Go Consumer repositories.

  3. Create a factory to generate instances of your log processor.

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer instance. Note: Each call to the generatorProcessor method should return a new SampleLogHubProcessor object.
            return new SampleLogHubProcessor();
        }
    }
  4. Create a main class to configure and start the worker thread.

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    public class Main {
        // The endpoint of Simple Log Service. Replace with your actual endpoint.
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The project name. Replace with the name of an existing project.
        private static String Project = "ali-test-project";
        // The Logstore name. Replace with the name of an existing Logstore.
        private static String Logstore = "ali-test-logstore";
        // You do not need to create the consumer group beforehand; the program automatically creates it on the first run.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // This example retrieves the AccessKey ID and AccessKey Secret from environment variables.
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // "consumer_1" is the consumer name, which must be unique within a consumer group. To balance consumption across multiple machines, you can use unique identifiers like the machine's IP address as the consumer name.
            // maxFetchLogGroupSize sets the maximum number of LogGroups to fetch per request. The default value is usually sufficient. You can adjust it by using config.setMaxFetchLogGroupSize(100). The valid range is (0, 1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the thread starts, the ClientWorker runs automatically. ClientWorker implements the Runnable interface.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Call the worker's shutdown() method to stop the consumer instance and its associated thread.
            worker.shutdown();
            // The ClientWorker creates asynchronous tasks. After shutdown(), wait for these tasks to complete gracefully. A 30-second sleep is recommended.
            Thread.sleep(30 * 1000);
        }
    }
  5. Run Main.java.

    The following is an example of the output from consuming NGINX logs:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Example 2: SDK consumption with SPL

  1. Add Maven dependencies.

    In your pom.xml file, add the following dependencies:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. Create a class to define your log processing logic.

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import java.util.List;
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Tracks the last time a checkpoint was saved.
        private long mLastSaveTime = 0;
        // Called once upon processor initialization.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
        // The main logic for consuming data. All exceptions must be handled within this method. Do not throw exceptions directly.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Print the fetched data.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Save a checkpoint to the server every 30 seconds. If the worker terminates unexpectedly, the new worker resumes from the last checkpoint, potentially reprocessing a small amount of data.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // A 'true' parameter flushes the checkpoint to the server immediately. By default, the in-memory checkpoint is automatically flushed to the server every 60 seconds.
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // A 'false' parameter caches the checkpoint locally. It will be flushed to the server by the auto-update mechanism.
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
        // This method is called when the worker is shutting down. You can perform cleanup tasks here.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Save the checkpoint to the server immediately.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. Create a factory to generate instances of your log processor.

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer instance. Note: Each call to the generatorProcessor method should return a new SPLLogHubProcessor object.
            return new SPLLogHubProcessor();
        }
    }
  4. Create a main class to configure and start the worker thread.

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    public class Main {
        // The endpoint of Simple Log Service. Replace with your actual endpoint.
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The project name. Replace with the name of an existing project.
        private static String Project = "ali-test-project";
        // The Logstore name. Replace with the name of an existing Logstore.
        private static String Logstore = "ali-test-logstore";
        // You do not need to create the consumer group beforehand; the program automatically creates it on the first run.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // This example retrieves the AccessKey ID and AccessKey Secret from environment variables.
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // "consumer_1" is the consumer name, which must be unique within a consumer group. To balance consumption across multiple machines, you can use unique identifiers like the machine's IP address as the consumer name.
            // maxFetchLogGroupSize sets the maximum number of LogGroups to fetch per request. The default value is usually sufficient. You can adjust it by using config.setMaxFetchLogGroupSize(100). The valid range is (0, 1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // Use setQuery to specify an SPL statement to filter logs during consumption.
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the thread starts, the ClientWorker runs automatically. ClientWorker implements the Runnable interface.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Call the worker's shutdown() method to stop the consumer instance and its associated thread.
            worker.shutdown();
            // The ClientWorker creates asynchronous tasks. After shutdown(), wait for these tasks to complete gracefully. A 30-second sleep is recommended.
            Thread.sleep(30 * 1000);
        }
    }
  5. Run Main.java.

    The following is an example of the output from consuming NGINX logs:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Step 3: View consumer group status

You can view the status of a consumer group using one of the following methods:

Java SDK

  1. View the consumption checkpoint for each shard. The following code provides an example:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // Get all consumer groups in the Logstore. The list is empty if no consumer groups exist.
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // Print the consumer group properties: name, heartbeat timeout, and ordered consumption status.
                System.out.println("Name: " + c.getConsumerGroupName());
                System.out.println("heartbeat timeout: " + c.getTimeout());
                System.out.println("ordered consumption: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // The time is a long integer, accurate to the microsecond.
                    System.out.println("Last checkpoint update time: " + cp.getUpdateTime());
                    System.out.println("Consumer name: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "Consumption has not started";
                    else{
                        // A UNIX timestamp in seconds. Format the output as needed.
                        try{
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        }
                        catch(LogException e){
                            if(e.GetErrorCode() == "InvalidCursor")
                                consumerPrg = "Invalid. The consumption checkpoint is older than the data retention period.";
                            else{
                                // internal server error
                                throw e;
                            }
                        }
                    }
                    System.out.println("consumption checkpoint: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                    try{
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    }
                    catch(LogException e){
                        // do nothing
                    }
                    // A UNIX timestamp in seconds. Format the output as needed.
                    System.out.println("Arrival time of the last record: " + endPrg);
                }
            }
        }
    }
  2. The following is the sample output:

    Name: ali-test-consumergroup2
    heartbeat timeout: 60
    ordered consumption: false
    shard: 0
    Last checkpoint update time: 0
    Consumer name: consumer_1
    consumption checkpoint: Consumption has not started
    Arrival time of the last record: 1729583617
    shard: 1
    Last checkpoint update time: 0
    Consumer name: consumer_1
    consumption checkpoint: Consumption has not started
    Arrival time of the last record: 1729583738
    Process finished with exit code 0

Console

  1. Log in to the Simple Log Service console.

  2. In the Projects section, click the one you want.

    image

  3. On the Log Storage > Logstores tab, click the 展开节点 icon to the left of the target Logstore, then click the 展开节点 icon to the left of Data Consumption.

  4. In the consumer group list, click the target consumer group.

  5. On the Consumer Group Status page, view the consumption checkpoint for each shard. This page shows details for each shard, including its ID (shard), Last Consumed Time, and the consuming Client. You can also use the Refresh and Reset Checkpoint buttons.

Related operations

  • RAM user authorization

    To use a RAM user to manage consumer groups, you must grant the required permissions to the RAM user. For more information, see Create and authorize a RAM user.

    The following table describes the required Actions.

    Action

    Description

    Resource

    log:GetCursorOrData(GetCursor)

    Gets a cursor based on a specified time.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}

    log:CreateConsumerGroup(CreateConsumerGroup)

    Creates a consumer group in a specified Logstore.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:ListConsumerGroup(ListConsumerGroup)

    Lists all consumer groups in a specified Logstore.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/*

    log:ConsumerGroupUpdateCheckPoint(UpdateCheckPoint)

    Updates the checkpoint on a shard for a specified consumer group.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat)

    Sends a heartbeat from a specified consumer to the server.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:UpdateConsumerGroup(UpdateConsumerGroup)

    Modifies the properties of a specified consumer group.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:GetConsumerGroupCheckPoint(GetConsumerGroupCheckPoint)

    Gets the checkpoint of one or all shards for a specified consumer group.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    To grant a RAM user permissions for a consumer group with the properties listed below, use the following sample policy.

    • Alibaba Cloud account ID: 174649****602745

    • Region ID: cn-hangzhou

    • Project name: project-test

    • Logstore name: logstore-test

    • Consumer group name: consumergroup-test

    Sample policy:

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
        }
      ]
    }
  • Troubleshooting

    For easier troubleshooting, configure Log4j for your consumer application to log exceptions from the consumer group. The following is a sample log4j.properties configuration:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    After you configure Log4j, the consumer application outputs exception information similar to the following:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • Consume data from a specified time

    // consumerStartTimeInSeconds indicates the time from which to start consuming data.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    // position is an enumeration. LogHubConfig.ConsumePosition.BEGIN_CURSOR starts consumption from the oldest data. LogHubConfig.ConsumePosition.END_CURSOR starts consumption from the latest data.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    Note
    • Choose a constructor based on your requirements.

    • If a checkpoint is already saved on the server, consumption resumes from that checkpoint.

    • Simple Log Service prioritizes a saved checkpoint for consumption. If you specify a start time, ensure the consumerStartTimeInSeconds value falls within the data retention period (TTL). Otherwise, the specified start time is ignored.

  • Reset a checkpoint

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // The timestamp must be a UNIX timestamp in seconds. If your timestamp is in milliseconds, divide it by 1000.
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

References