与使用SDK消费数据相比,使用消费组(ConsumerGroup)消费日志数据的优点在于,用户无需关心日志服务的实现细节和消费者之间的负载均衡、failover等,只需要专注于业务逻辑即可。

基本概念

使用消费组消费日志数据之前,请阅读以下日志服务消费组基本概念,包括消费组(ConsumerGroup)和消费者(Consumer)。
概念 说明
消费组 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的数据,消费者之间不会重复消费数据。
消费者 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个Logstore下面会有多个Shard,协同消费库的功能就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则:
  • 每个Shard只会分配到一个消费者。
  • 一个消费者可以同时拥有多个Shard。

新的消费者加入一个消费组,这个消费组下面的Shard从属关系会调整,以达到消费负载均衡的目的,但是仍遵循分配原则,分配过程对用户透明。

协同消费库的另一个功能是保存Checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。

操作步骤

日志服务消费组通过JAVA语言和Python语言实现,本文档以JAVA为例。更多内容,请参考GitHub:JAVAPythonGolang

  1. 添加maven 依赖
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.16</version>
    </dependency>
  2. 创建main .java文件
    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // 日志服务域名,根据实际情况填写
        private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,根据实际情况填写
        private static String sProject = "ali-cn-hangzhou-sls-admin";
        // 日志库名称,根据实际情况填写
        private static String sLogstore = "sls_operation_log";
        // 消费组名称,根据实际情况填写
        private static String sConsumerGroup = "consumerGroupX";
        // 消费数据的ak,根据实际情况填写
        private static String sAccessKeyId = "";
        private static String sAccessKey = "";
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费者名称可以使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值即可,如有调整请注意取值范围(0,1000]
            LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            //Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            //调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            //ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
            Thread.sleep(30 * 1000);
        }
    }
  3. 创建SampleLogHubProcessor.java文件
    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化 checkpoint 的时间
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
        public String process(List<LogGroupData> logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // 这里简单的将获取到的数据打印出来
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                        flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                        FastLogContent content = log.getContents(cIdx);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔 30 秒,写一次 checkpoint 到服务端,如果 30 秒内,worker crash,
            // 新启动的 worker 会从上一个 checkpoint 取消费数据,有可能有少量的重复数据
            if (curTime - mLastCheckTime > 30 * 1000) {
                try {
                    //参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            //将消费断点保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例
            return new SampleLogHubProcessor();
        }
    }
    说明 运行以上代码,可以将一个Logstore下的所有数据打印出来,如果需要多个消费者共同消费一个Logstore,可以按程序注释修改程序,用同样的消费组名称加不同的消费者名称,启动另外的消费进程。

限制与异常诊断

每个Logstore创建消费组个数的上限为10。超出时将报错ConsumerGroupQuotaExceed

建议为消费者程序配置log4j,将消费组内部遇到的错误信息打印出来,方便定位异常。例如,放置log4j.properties文件到resources目录下执行程序可以看到类似如下异常:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
以下配置为log4j.properties典型配置:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

高阶操作

普通用户使用上述程序即可消费数据,高阶用户还可以参考以下内容完成高阶操作。

  • 消费某个时间开始的数据

    上面代码中的LoghubConfig有两个构造函数:
    // consumerStartTimeInSeconds参数表示1970之后的秒数,含义是消费这个时间点之后的数据。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);

    可以按照消费需求,使用不同的构造方法,但是注意,如果服务端保存有checkpoint,那么开始消费位置以服务端保存的checkpoint为准。

  • 重置Checkpoint

    在补数据或重复计算等场景中,需要将某个ConsumerGroup点位设置为某一个时间点,使当前消费组能够从新位置开始消费。可通过以下两种方式完成。

    • 删除消费组。
      • 停止消费程序,并在控制台删除消费组。
      • 修改代码,使用指定时间点消费,重新启动程序。
    • 通过SDK将当前消费组重置到某一个时间点。
      • 停止消费程序。
      • 使用SDK修改位点,重新启动消费程序。
    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

状态与报警

使用RAM子账号进行访问

子账号访问需要设置消费组相关的RAM权限,设置方法请参考RAM文档

可以设置的相关Action如下:
Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
例如,假设Project所在地域为cn-hangzhou,Project名称为project-test,消费的Logstore名称为logstore-test,Project的owner的aliuid 为1234567,消费组名称为consumergroup-test,则需要为RAM子账号设置以下权限。
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}