全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
日志服务

消费组-使用

更新时间:2018-01-23 13:37:43

介绍

协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。

Spark StreamingStorm 以及即将推出的 Flink Connector都以Consuemr Library作为基础实现。

功能

使用消费库之前有两个概念需要理解,分别是消费组(ConsumerGroup)、消费者(Consumer)。

  • 消费组

    一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。

  • 消费者

    消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。

在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则:

  • 每个shard只会分配到一个消费者。
  • 一个消费者可以同时拥有多个shard。

新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。

协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。

使用

  • 添加maven依赖

    1. <dependency>
    2. <groupId>com.google.protobuf</groupId>
    3. <artifactId>protobuf-java</artifactId>
    4. <version>2.5.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.aliyun.openservices</groupId>
    8. <artifactId>aliyun-log</artifactId>
    9. <version>0.6.11</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>com.aliyun.openservices</groupId>
    13. <artifactId>loghub-client-lib</artifactId>
    14. <version>0.6.13</version>
    15. </dependency>
  • Main.java文件

    1. public class Main {
    2. // 日志服务域名,根据实际情况填写
    3. private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    4. // 日志服务项目名称,根据实际情况填写
    5. private static String sProject = "ali-cn-hangzhou-sls-admin";
    6. // 日志库名称,根据实际情况填写
    7. private static String sLogstore = "sls_operation_log";
    8. // 消费组名称,根据实际情况填写
    9. private static String sConsumerGroup = "consumerGroupX";
    10. // 消费数据的ak,根据实际情况填写
    11. private static String sAccessKeyId = "";
    12. private static String sAccessKey = "";
    13. public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
    14. {
    15. // 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个logstore,这个时候消费组名称可以使用机器ip来区分。
    16. LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
    17. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
    18. Thread thread = new Thread(worker);
    19. //Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
    20. thread.start();
    21. Thread.sleep(60 * 60 * 1000);
    22. //调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
    23. worker.shutdown();
    24. //ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
    25. Thread.sleep(30 * 1000);
    26. }
    27. }
  • SampleLogHubProcessor.java文件
  1. public class SampleLogHubProcessor implements ILogHubProcessor
  2. {
  3. private int mShardId;
  4. // 记录上次持久化 check point 的时间
  5. private long mLastCheckTime = 0;
  6. public void initialize(int shardId)
  7. {
  8. mShardId = shardId;
  9. }
  10. // 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
  11. public String process(List<LogGroupData> logGroups,
  12. ILogHubCheckPointTracker checkPointTracker)
  13. {
  14. // 这里简单的将获取到的数据打印出来
  15. for(LogGroupData logGroup: logGroups){
  16. FastLogGroup flg = logGroup.GetFastLogGroup();
  17. System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
  18. flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
  19. System.out.println("Tags");
  20. for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
  21. FastLogTag logtag = flg.getLogTags(tagIdx);
  22. System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
  23. }
  24. for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
  25. FastLog log = flg.getLogs(lIdx);
  26. System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
  27. for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
  28. FastLogContent content = log.getContents(cIdx);
  29. System.out.println(content.getKey() + "\t:\t" + content.getValue());
  30. }
  31. }
  32. }
  33. long curTime = System.currentTimeMillis();
  34. // 每隔 30 秒,写一次 check point 到服务端,如果 30 秒内,worker crash,
  35. // 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有少量的重复数据
  36. if (curTime - mLastCheckTime > 30 * 1000)
  37. {
  38. try
  39. {
  40. //参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
  41. checkPointTracker.saveCheckPoint(true);
  42. }
  43. catch (LogHubCheckPointException e)
  44. {
  45. e.printStackTrace();
  46. }
  47. mLastCheckTime = curTime;
  48. }
  49. return null;
  50. }
  51. // 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
  52. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
  53. {
  54. //将消费断点保存到服务端。
  55. try {
  56. checkPointTracker.saveCheckPoint(true);
  57. } catch (LogHubCheckPointException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }
  62. class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
  63. {
  64. public ILogHubProcessor generatorProcessor()
  65. {
  66. // 生成一个消费实例
  67. return new SampleLogHubProcessor();
  68. }
  69. }

运行上面的代码,就可以将一个logstore下面的所有数据打印出来,如果需要多个消费者共同消费一个logstore,可以按程序注释中说的,修改程序,用同样的消费组名称加不同的消费者名称,启动另外的消费进程。

限制说明

每个Logstore创建消费组个数的上限为10。超出时将报错ConsumerGroupQuotaExceed

状态与报警

  1. 在控制台查看消费组状态
  2. 通过云监控查看消费组延迟,并配置报警

高级定制

对于普通用户,使用上面的程序就可以消费数据,下面要讨论的内容是一些高级主题。

  • 希望消费某个时间开始的数据

    上面代码中的LoghubConfig有两个构造函数:

    1. // consumerStartTimeInSeconds参数表示1970之后的秒数,含义是消费这个时间点之后的数据。
    2. public LogHubConfig(String consumerGroupName,
    3. String consumerName,
    4. String loghubEndPoint,
    5. String project, String logStore,
    6. String accessId, String accessKey,
    7. int consumerStartTimeInSeconds);
    8. // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    9. public LogHubConfig(String consumerGroupName,
    10. String consumerName,
    11. String loghubEndPoint,
    12. String project, String logStore,
    13. String accessId, String accessKey,
    14. ConsumePosition position);

    可以按照消费需求,使用不同的构造方法,但是注意,如果服务端保存有checkpoint,那么开始消费位置以服务端保存的checkpoint为准。

  • 用户使用RAM子账号进行访问

    子用户需要设置消费组相关的RAM权限,设置方法参考RAM的文档,需要设置的权限如下:

Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
  • 重置checkpoint

    在一些场景中(补数据、重复计算),我们需要将某个ConsumerGroup点位设置为某一个时间点,使得当前消费组能够从新位置开始消费,有两种方法:

    1. 删除消费组。
      • 停掉消费程序,并在控制台删除消费组。
      • 修改代码,根据上面讨论的使用指定时间点消费,重新启动程序。
    2. 通过SDK将当前消费组重置到某一个时间点。
      • 停掉消费程序。
      • 使用sdk修改位点,重新启动消费程序。
    1. Client client = new Client(host, accessId, accessKey);
    2. long time_stamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
    3. ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore));
    4. ArrayList<Shard> all_shards = shard_res.GetShards();
    5. for (Shard shard: all_shards)
    6. {
    7. shardId = shard.GetShardId();
    8. long cursor_time = time_stamp;
    9. String cursor = client.GetCursor(project, logStore, shardId, cursor_time).GetCursor();
    10. client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
    11. }
本文导读目录