全部产品
云市场

协同消费功能介绍

更新时间:2019-07-25 15:44:50

协同消费功能介绍

datahub-client-library是Java-SDK读写功能的上层封装,支持协同消费的功能的Consumer,以及对shard均匀写入的Producer。

概念

点位服务

点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。

协同消费示意图

场景

现有3个消费者实例A,B,C,Topic共有10个shard

  1. 实例A启动,分配10个shard
  2. 实例B,C启动,shard分配为4,3,3
  3. 将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3
  4. 实例C停止后,shard分配为6,5
心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

版本

Maven依赖以及JDK:

maven pom

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>datahub-client-library</artifactId>
  4. <version>1.0.6-public</version>
  5. </dependency>

jdk

  1. jdk: >= 1.7

示例

初始化Producer

  1. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  2. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  3. String accessId = "<YourAccessKeyId>";
  4. String accessKey = "<YourAccessKeySecret>";
  5. String projectName = "<YourProjectName>";
  6. String topicName = "<YourTopicName>";
  7. ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
  8. Producer producer = new Producer(projectName, topicName, config);

写数据

  1. RecordSchema schema = new RecordSchema();
  2. schema.addField(new Field("field1", FieldType.STRING));
  3. schema.addField(new Field("field2", FieldType.BIGINT));
  4. List<RecordEntry> recordEntries = new ArrayList<>();
  5. for (int cnt = 0; cnt < 10; ++cnt) {
  6. RecordEntry entry = new RecordEntry();
  7. entry.addAttribute("key1", "value1");
  8. entry.addAttribute("key2", "value2");
  9. TupleRecordData data = new TupleRecordData(schema);
  10. data.setField("field1", "testValue");
  11. data.setField("field2", 1);
  12. entry.setRecordData(data);
  13. recordEntries.add(entry);
  14. }
  15. int maxRetry = 3;
  16. while (true) {
  17. try {
  18. producer.send(records, maxRetry);
  19. break;
  20. } catch (MalformedRecordException e) {
  21. // malformed RecordEntry
  22. } catch (InvalidParameterException e) {
  23. // invalid param
  24. } catch (ResourceNotFoundException e) {
  25. // project, topic or shard not found, sometimes caused by split/merge shard
  26. } catch (DatahubClientException e) {
  27. // network or other exceptions exceeded retry limit
  28. }
  29. }
  30. // close before exit
  31. producer.close();

初始化Consumer

配置
名称 描述
autoCommit 是否自动提交点位,默认为true
offsetCommitTimeoutMs 点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]
sessionTimeoutMs 会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]
fetchSize 单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0
  1. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  2. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  3. String accessId = "<YourAccessKeyId>";
  4. String accessKey = "<YourAccessKeySecret>";
  5. String projectName = "<YourProjectName>";
  6. String topicName = "<YourTopicName>";
  7. String SubId = "<YourSubscriptionId>";
  8. // 1. 使用协同消费,提供SubId
  9. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  10. Consumer consumer = new Consumer(projectName, topicName, SubId, config);
  11. // 2. 不使用协同消费,使用点位服务,提供SubId和Consumer读取的shard列表
  12. List<String> assignment = Arrays.asList("0", "1", "2");
  13. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  14. Consumer consumer = new Consumer(projectName, topicName, SubId, assignment, config);
  15. // 3. 不使用协同消费,不使用点位服务记录的点位,提供SubId,Consumer读取的shard和初始点位
  16. Map<String, Offset> offsetMap = new HashMap<>();
  17. // 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
  18. offsetMap.put("0", new Offset(100, 1548573440756L));
  19. // 只提供sequence,按照sequence获取Cursor
  20. offsetMap.put("1", new Offset().setSequence(1));
  21. // 只提供timestamp,按照timestamp获取Cursor
  22. offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
  23. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  24. Consumer consumer = new Consumer(projectName, topicName, SubId, offsetMap, config);

读数据

  1. int maxRetry = 3;
  2. boolean stop = false;
  3. while (!stop) {
  4. try {
  5. while (true) {
  6. RecordEntry record = consumer.read(maxRetry);
  7. if (record != null) {
  8. TupleRecordData data = (TupleRecordData) record.getRecordData();
  9. System.out.println("field1:" + data.getField(0) + ", field2:" + data.getField("field2"));
  10. }
  11. }
  12. } catch (SubscriptionSessionInvalidException | SubscriptionOffsetResetException e) {
  13. // subscription exception, will not recover
  14. // print some log or just use a new consumer
  15. consumer.close();
  16. consumer = new Consumer(TEST_PROJECT, TEST_TOPIC, TEST_SUB_ID, config);
  17. } catch (ResourceNotFoundException | InvalidParameterException e) {
  18. // - project, topic, shard, subscription not found
  19. // - seek out of range
  20. // - sometimes shard operation cause ResourceNotFoundException
  21. // should make sure if resource exists, print some log or just exit
  22. } catch (DatahubClientException e) {
  23. // - network or other exception exceed retry limit
  24. // can just sleep and retry
  25. }
  26. }
  27. // close before exit
  28. consumer.close();

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。