全部产品

协同消费

更新时间:2020-09-15 17:58:40

协同消费

datahub-client-library是Java-SDK读写功能的上层封装,支持协同消费的功能的Consumer,以及对shard均匀写入的Producer。

概念

点位服务

点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。

协同消费示意图

场景

现有3个消费者实例A,B,C,Topic共有10个shard

  1. 实例A启动,分配10个shard
  2. 实例B,C启动,shard分配为4,3,3
  3. 将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3
  4. 实例C停止后,shard分配为6,5
心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

版本

Maven依赖以及JDK:

maven pom

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.17.1-public</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.datahub</groupId>
  8. <artifactId>datahub-client-library</artifactId>
  9. <version>1.1.12-public</version>
  10. </dependency>

jdk

  1. jdk: >= 1.8

示例

Producer 代码示例

  1. import com.aliyun.datahub.client.exception.AuthorizationFailureException;
  2. import com.aliyun.datahub.client.exception.DatahubClientException;
  3. import com.aliyun.datahub.client.exception.InvalidParameterException;
  4. import com.aliyun.datahub.client.exception.MalformedRecordException;
  5. import com.aliyun.datahub.client.exception.NoPermissionException;
  6. import com.aliyun.datahub.client.exception.ShardNotFoundException;
  7. import com.aliyun.datahub.client.model.Field;
  8. import com.aliyun.datahub.client.model.FieldType;
  9. import com.aliyun.datahub.client.model.RecordEntry;
  10. import com.aliyun.datahub.client.model.RecordSchema;
  11. import com.aliyun.datahub.client.model.TupleRecordData;
  12. import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
  13. import com.aliyun.datahub.clientlibrary.producer.Producer;
  14. import com.aliyun.datahub.exception.ResourceNotFoundException;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.concurrent.TimeUnit;
  20. public class DatahubWriter {
  21. private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);
  22. private static void sleep(long milliSeconds) {
  23. try {
  24. TimeUnit.MILLISECONDS.sleep(milliSeconds);
  25. } catch (InterruptedException e) {
  26. // TODO:自行处理异常
  27. }
  28. }
  29. private static List<RecordEntry> genRecords(RecordSchema schema) {
  30. List<RecordEntry> recordEntries = new ArrayList<>();
  31. for (int cnt = 0; cnt < 10; ++cnt) {
  32. RecordEntry entry = new RecordEntry();
  33. entry.addAttribute("key1", "value1");
  34. entry.addAttribute("key2", "value2");
  35. TupleRecordData data = new TupleRecordData(schema);
  36. data.setField("field1", "testValue");
  37. data.setField("field2", 1);
  38. entry.setRecordData(data);
  39. recordEntries.add(entry);
  40. }
  41. return recordEntries;
  42. }
  43. private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
  44. int maxRetry = 3;
  45. while (true) {
  46. try {
  47. // 自动选择shard写入
  48. producer.send(recordEntries, maxRetry);
  49. // 指定写入shard "0"
  50. // producer.send(recordEntries, "0", maxRetry);
  51. LOG.error("send records: {}", recordEntries.size());
  52. break;
  53. } catch (MalformedRecordException e) {
  54. // record 格式非法,根据业务场景选择忽略或直接抛异常
  55. LOG.error("write fail", e);
  56. throw e;
  57. } catch (InvalidParameterException |
  58. AuthorizationFailureException |
  59. NoPermissionException e) {
  60. // 请求参数非法
  61. // 签名不正确
  62. // 没有权限
  63. LOG.error("write fail", e);
  64. throw e;
  65. } catch (ShardNotFoundException e) {
  66. // shard 不存在, 如果不是写入自己指定的shard,可以不用处理
  67. LOG.error("write fail", e);
  68. sleep(1000);
  69. } catch (ResourceNotFoundException e) {
  70. // project, topic 或 shard 不存在
  71. LOG.error("write fail", e);
  72. throw e;
  73. } catch (DatahubClientException e) {
  74. // 基类异常,包含网络问题等,可以选择重试
  75. LOG.error("write fail", e);
  76. sleep(1000);
  77. }
  78. }
  79. }
  80. public static void main(String[] args) {
  81. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  82. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  83. String accessId = "<YourAccessKeyId>";
  84. String accessKey = "<YourAccessKeySecret>";
  85. String projectName = "<YourProjectName>";
  86. String topicName = "<YourTopicName>";
  87. RecordSchema schema = new RecordSchema();
  88. schema.addField(new Field("field1", FieldType.STRING));
  89. schema.addField(new Field("field2", FieldType.BIGINT));
  90. ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
  91. Producer producer = new Producer(projectName, topicName, config);
  92. // 根据场景控制循环
  93. boolean stop = false;
  94. try {
  95. while (!stop) {
  96. List<RecordEntry> recordEntries = genRecords(schema);
  97. sendRecords(producer, recordEntries);
  98. }
  99. } finally {
  100. // 确保资源正确释放
  101. producer.close();
  102. }
  103. }
  104. }

初始化Consumer

配置
名称 描述
autoCommit 是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。
offsetCommitTimeoutMs 点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]
sessionTimeoutMs 会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]
fetchSize 单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0
  1. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  2. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  3. String accessId = "<YourAccessKeyId>";
  4. String accessKey = "<YourAccessKeySecret>";
  5. String projectName = "<YourProjectName>";
  6. String topicName = "<YourTopicName>";
  7. String subId = "<YourSubscriptionId>";
  8. // 1. 使用协同消费,subId
  9. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  10. Consumer consumer = new Consumer(projectName, topicName, subId, config);
  11. // 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
  12. List<String> assignment = Arrays.asList("0", "1", "2");
  13. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  14. Consumer consumer = new Consumer(projectName, topicName, subId, assignment, config);
  15. // 3. 不使用协同消费,不使用点位服务记录的点位,提供subId,Consumer读取的shard和初始点位
  16. Map<String, Offset> offsetMap = new HashMap<>();
  17. // 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
  18. offsetMap.put("0", new Offset(100, 1548573440756L));
  19. // 只提供sequence,按照sequence获取Cursor
  20. offsetMap.put("1", new Offset().setSequence(1));
  21. // 只提供timestamp,按照timestamp获取Cursor
  22. offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
  23. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  24. Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);

协同代码示例

  1. import com.aliyun.datahub.client.exception.AuthorizationFailureException;
  2. import com.aliyun.datahub.client.exception.DatahubClientException;
  3. import com.aliyun.datahub.client.exception.InvalidParameterException;
  4. import com.aliyun.datahub.client.exception.NoPermissionException;
  5. import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
  6. import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
  7. import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
  8. import com.aliyun.datahub.client.model.RecordEntry;
  9. import com.aliyun.datahub.client.model.TupleRecordData;
  10. import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
  11. import com.aliyun.datahub.clientlibrary.consumer.Consumer;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.util.concurrent.TimeUnit;
  15. public class DatahubReader {
  16. private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);
  17. private static void sleep(long milliSeconds) {
  18. try {
  19. TimeUnit.MILLISECONDS.sleep(milliSeconds);
  20. } catch (InterruptedException e) {
  21. // TODO:自行处理异常
  22. }
  23. }
  24. public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
  25. {
  26. return new Consumer(project, topic, subId, config);
  27. }
  28. public static void main(String[] args) {
  29. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  30. String accessId = "<YourAccessKeyId>";
  31. String accessKey = "<YourAccessKeySecret>";
  32. String projectName = "<YourProjectName>";
  33. String topicName = "<YourTopicName>";
  34. String subId = "<YourSubscriptionId>";
  35. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  36. Consumer consumer = createConsumer(config, projectName, topicName, subId);
  37. int maxRetry = 3;
  38. boolean stop = false;
  39. try {
  40. while (!stop) {
  41. try {
  42. while (true) {
  43. // 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
  44. // 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
  45. RecordEntry record = consumer.read(maxRetry);
  46. // 处理数据
  47. if (record != null) {
  48. TupleRecordData data = (TupleRecordData) record.getRecordData();
  49. // 根据自己的schema来处理数据,此处打印第一列的内容
  50. LOG.info("field1: {}", data.getField(0));
  51. // 根据列名取数据
  52. // LOG.info("field2: {}", data.getField("field2"));
  53. // 非自动提交模式,每条record处理完后都需要ack
  54. // 自动提交模式,ack不会做任何操作
  55. // 1.1.7版本及以上
  56. record.getKey().ack();
  57. } else {
  58. LOG.info("read null");
  59. }
  60. }
  61. } catch (SubscriptionOffsetResetException e) {
  62. // 点位被重置,重新初始化consumer
  63. try {
  64. consumer.close();
  65. consumer = createConsumer(config, projectName, topicName, subId);
  66. } catch (DatahubClientException e1) {
  67. // 初始化失败,重试或直接抛异常
  68. LOG.error("create consumer failed", e);
  69. throw e;
  70. }
  71. } catch (InvalidParameterException |
  72. SubscriptionOfflineException |
  73. SubscriptionSessionInvalidException |
  74. AuthorizationFailureException |
  75. NoPermissionException e) {
  76. // 请求参数非法
  77. // 订阅被下线
  78. // 订阅下相同shard被其他客户端占用
  79. // 签名不正确
  80. // 没有权限
  81. LOG.error("read failed", e);
  82. throw e;
  83. } catch (DatahubClientException e) {
  84. // 基类异常,包含网络问题等,可以选择重试
  85. LOG.error("read failed, retry", e);
  86. sleep(1000);
  87. }
  88. }
  89. } catch (Throwable e) {
  90. LOG.error("read failed", e);
  91. } finally {
  92. // 确保资源正确释放
  93. // 会提交已ack的点位
  94. consumer.close();
  95. }
  96. }
  97. }

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。