协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。

Spark StreamingStorm 以及Flink Connector都以Consumer Library作为基础实现。

基本概念

使用消费库之前有两个概念需要理解,分别是消费组(ConsumerGroup)、消费者(Consumer)。
  • 消费组

    一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。

  • 消费者

    消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。

在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则:
  • 每个shard只会分配到一个消费者。
  • 一个消费者可以同时拥有多个shard。

新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。

协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。

使用说明

maven 依赖

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>
main .java文件
public class Main {
  // 日志服务域名,根据实际情况填写
  private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
  // 日志服务项目名称,根据实际情况填写
  private static String sProject = "ali-cn-hangzhou-sls-admin";
  // 日志库名称,根据实际情况填写
  private static String sLogstore = "sls_operation_log";
  // 消费组名称,根据实际情况填写
  private static String sConsumerGroup = "consumerGroupX";
  // 消费数据的ak,根据实际情况填写
  private static String sAccessKeyId = "";
  private static String sAccessKey = "";
  public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
  {
       // 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费组名称可以使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值即可,如有调整请注意取值范围(0,1000]
      LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
      ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
       thread.start();
       Thread.sleep(60 * 60 * 1000);
       //调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
       worker.shutdown();
       //ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
       Thread.sleep(30 * 1000);
  }
}
SampleLogHubProcessor.java文件
public class SampleLogHubProcessor implements ILogHubProcessor 
{
  private int mShardId;
  // 记录上次持久化 check point 的时间
  private long mLastCheckTime = 0; 
  public void initialize(int shardId) 
  {
      mShardId = shardId;
  }
  // 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
  public String process(List<LogGroupData> logGroups,
          ILogHubCheckPointTracker checkPointTracker) 
  {
      // 这里简单的将获取到的数据打印出来
      for(LogGroupData logGroup: logGroups){
          FastLogGroup flg = logGroup.GetFastLogGroup();
          System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                  flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
          System.out.println("Tags");
          for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
              FastLogTag logtag = flg.getLogTags(tagIdx);
              System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
          }
          for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
              FastLog log = flg.getLogs(lIdx);
              System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
              for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                  FastLogContent content = log.getContents(cIdx);
                  System.out.println(content.getKey() + "\t:\t" + content.getValue());
              }
          }
      }
      long curTime = System.currentTimeMillis();
      // 每隔 30 秒,写一次 check point 到服务端,如果 30 秒内,worker crash,
      // 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有少量的重复数据
      if (curTime - mLastCheckTime >  30 * 1000) 
      {
          try 
          {
              //参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
              checkPointTracker.saveCheckPoint(true);
          } 
          catch (LogHubCheckPointException e) 
          {
              e.printStackTrace();
          }
          mLastCheckTime = curTime;
      } 
      return null;  
  }
  // 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
  public void shutdown(ILogHubCheckPointTracker checkPointTracker) 
  {
      //将消费断点保存到服务端。
      try {
          checkPointTracker.saveCheckPoint(true);
      } catch (LogHubCheckPointException e) {
          e.printStackTrace();
      }
  }
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory 
{
  public ILogHubProcessor generatorProcessor()
  {   
      // 生成一个消费实例
      return new SampleLogHubProcessor();
  }
}

运行上面的代码,就可以将一个Logstore下面的所有数据打印出来,如果需要多个消费者共同消费一个Logstore,可以按程序注释中说的,修改程序,用同样的消费组名称加不同的消费者名称,启动另外的消费进程。

限制与异常诊断

每个Logstore创建消费组个数的上限为10。超出时将报错ConsumerGroupQuotaExceed

建议为消费者程序配置log4j,可以帮助您将消费组内部遇到的错误信息抛出来,方便定位异常。放置log4j.properties文件到resources目录下执行程序可以看到类似如下异常:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
一个简单的log4j.properties配置供您参考:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

状态与报警

高级定制

对于普通用户,使用上面的程序就可以消费数据,下面要讨论的内容是一些高级主题。

  • 希望消费某个时间开始的数据

    上面代码中的LoghubConfig有两个构造函数:
    // consumerStartTimeInSeconds参数表示1970之后的秒数,含义是消费这个时间点之后的数据。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);

    可以按照消费需求,使用不同的构造方法,但是注意,如果服务端保存有checkpoint,那么开始消费位置以服务端保存的checkpoint为准。

  • 用户使用RAM子账号进行访问

    子用户需要设置消费组相关的RAM权限,设置方法参考RAM的文档,需要设置的权限如下:

Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
  • 重置checkpoint

    在一些场景中(补数据、重复计算),我们需要将某个ConsumerGroup点位设置为某一个时间点,使得当前消费组能够从新位置开始消费,有两种方法:

    1. 删除消费组。
      • 停掉消费程序,并在控制台删除消费组。
      • 修改代码,根据上面讨论的使用指定时间点消费,重新启动程序。
    2. 通过SDK将当前消费组重置到某一个时间点。
      • 停掉消费程序。
      • 使用sdk修改位点,重新启动消费程序。
    Client client = new Client(host, accessId, accessKey);
    long time_stamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
    ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore));
    ArrayList<Shard> all_shards = shard_res.GetShards();
    for (Shard shard: all_shards)
    {
      shardId = shard.GetShardId();
      long cursor_time = time_stamp;
      String cursor = client.GetCursor(project, logStore, shardId,     cursor_time).GetCursor();
      client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
    }