协同消费

概念介绍

点位服务

  • 点位服务是提供将消费的点位保存在服务端的功能,点位由sequencetimestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

  • Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

  • Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub IdConsummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1Consumer。

协同消费示意图

场景示例

现有3个消费者实例A,B,C,Topic共有10shard。

  1. 实例A启动,分配10shard。

  2. 实例B,C启动,shard分配为4,3,3。

  3. 1shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3。

  4. 实例C停止后,shard分配为6,5。

心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

前置条件

  • 协同消费开发代码的Java版本需1.8及以上版本。

  • 协同消费开发代码的Maven需添加以下依赖:

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.25.1</version>
    </dependency>
    <dependency>
          <groupId>com.aliyun.datahub</groupId>
          <artifactId>datahub-client-library</artifactId>
          <version>1.4.3</version>
    </dependency>
说明

datahub-client-library 1.4及以后版本 Producer / Consumer修改为线程安全,可以在多个线程中使用同一个Producer / Consumer。低于1.4版本,为非线程安全的consumer / producer ,多线程使用方式请参考下文 多线程读写示例章节.

身份验证

背景信息

  • AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey IDAccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。

  • Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。

前提条件

  • 已获取RAM用户账号的AccessKey IDAccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息

  • 重要

    阿里云账号(即主账号)的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您为RAM用户创建AccessKey,非必要情况下请勿为阿里云主账号创建AccessKey。

    RAM用户的AccessKey Secret只能在创建AccessKey时显示,创建完成后不支持查看。请在创建好AccessKey后,及时并妥善保存AccessKey Secret。

  • 已安装阿里云SDK Credentials工具。

    Maven安装方式(推荐使用Credentials最新版本):

    <dependency>
     <groupId>com.aliyun</groupId>
     <artifactId>credentials-java</artifactId>
     <version>0.2.11</version>
    </dependency>
  • JDK版本为1.7及以上。

配置方案

本文示例的是通过配置环境变量方式,更多方式请访问配置环境变量

说明

使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否则,配置文件将不生效。

阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。

配置自动鉴权

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

LinuxmacOS系统配置方法

执行以下命令:

export ALIBABA_CLOUD_ACCESS_KEY_ID=AccessKey ID
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=AccessKey Secret

<access_key_id>需替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。

Windows系统配置方法

  1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey IDAccessKey Secret。

  2. 重启Windows系统。

示例代码

    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();

示例代码

Producer 代码示例

同步写入

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);

  public static void main(String[] args) throws Exception {
    // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
    // ClientMetrics.startMetrics();

    //以杭州Region为例
    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();
    String projectName = "";
    String topicName = "";

    ////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
    ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
    DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
    RecordSchema schema = datahubProducer.getTopicSchema();
    ///////////////////// STEP2. 根据Topic是BLOB还是TUPLE类型,选择构建写入Record ////////////
    List<RecordEntry> recordList = new ArrayList<>();
    // 构建BLOB非结构化数据写入
    for (int i = 0; i < 10; ++i) {
      RecordEntry record = new RecordEntry();
      // 构建BLOB数据
      BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
      // 构建TUPLE数据
     //TupleRecordData data = new TupleRecordData(schema);
     //data.setField("f1", "f1_" + i);

      record.setRecordData(data);
      record.addAttribute("key1", "value1"); // 数据字段,可选

      recordList.add(record);
    }

    ///////////////////////// STEP3:循环写入数据 /////////////////////////
    try {
      for (int i = 0; i < 10000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} ok, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重试
          Thread.sleep(1000);
        }
      }
    } finally {
      // 关闭producer相关资源
      datahubProducer.close();
    }

    // 进程退出时,调用全局清理函数
    HttpClient.close();

//        ClientMetrics.stopMetrics();
  }
}

异步写入

public class SimpleProducerAsync {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducerAsync.class);

    public static void main(String[] args) throws Exception {
        // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
        // ClientMetrics.startMetrics();
        //以杭州Region为例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";

        ////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
        ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
        config.setMaxAsyncBufferTimeMs(30000); // 设置缓存时间
        DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);

        RecordSchema schema = datahubProducer.getTopicSchema();

        // 异步写入可以注册回调函数
        WriteCallback callback  = new WriteCallback() {
            @Override
            public void onSuccess(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, long sendTimeMs) {
                LOGGER.info("Message sent successfully");
            }

            @Override
            public void onFailure(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, DatahubClientException e) {
                LOGGER.error("Message sent fail", e);
            }
        };

        // 可选,配置数据哈希策略
        // partition优先顺序: 依次按照RecordEntry的shardId, hashKey, partitionKey的顺序计算最终写入的shardId
        RecordPartitioner partitioner = new DefaultRecordPartitioner();

        ///////////////////////// STEP2:异步循环写入数据 /////////////////////////
        try {
            for (int i = 0; i < 1000; ++i) {
                try {
					//Tuple结构化数据写入
                    RecordEntry record = new RecordEntry();
                    TupleRecordData data = new TupleRecordData(schema);
                    data.setField("f1", "f1_" + i);
					          //BLOB非结构化数据写入
					          //BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
                    record.setRecordData(data);
                    record.addAttribute("key1", "value1"); // 数据字段,可选
                    // 单条发送,发送数据时可以指定是否进行partition,
                    datahubProducer.sendAsync(record, callback, partitioner);
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Write data fail", e);
                        break;
                    }
                    // sleep重试
                    Thread.sleep(1000);
                }
            }

            // 阻塞到数据发送成功
            datahubProducer.flush(true);
        } catch (Exception e) {
            LOGGER.warn("Write fail", e);
        } finally {
            // 关闭producer相关资源
            datahubProducer.close();
        }

        // 进程退出时,调用全局清理函数
        HttpClient.close();
//        ClientMetrics.stopMetrics();
    }
}

协同消费代码示例

配置参数说明

名称

描述

autoCommit

是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。

offsetCommitTimeoutMs

点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]

sessionTimeoutMs

会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]

fetchSize

单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.*;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);

    public static void main(String[] args) throws Exception {
        // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
        // ClientMetrics.startMetrics();


        //以杭州Region为例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        String subId = "";

        ////////////////////////////// STEP1. 创建DatahubConsumer //////////////////////////
        ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId, accessKeySecret);
        DatahubConsumer datahubConsumer = new DatahubConsumer(projectName, topicName, subId, config);
        ///////////////////////// STEP2:循环读取数据 /////////////////////////
        try {
            while (true) {
                try {
                    RecordEntry record = datahubConsumer.read(3000);
                    if (record == null) {
                        continue; // 3s内未读取到数据,(1). 无数据 (2). 内部状态未Ready,比如协同消费暂时未分配到shard
                    }

                    RecordData recordData = record.getRecordData();
                    // 根据Topic为BLOB类型还是TUPLE类型进行不同的数据处理逻辑, 一种topic只有一种类型
                    if (recordData instanceof TupleRecordData) {
                        TupleRecordData data = (TupleRecordData) recordData;
                        RecordSchema schema = data.getRecordSchema();
                        // 示例中仅做简单的字符串拼接
                        StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < schema.getFields().size(); ++i) {
                            sb.append(data.getField(i)).append(",");
                        }
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), sb);
                    } else {
                        BlobRecordData data = (BlobRecordData) recordData;
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), new String(data.getData()));
                    }
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Read data fail", e);
                        break;
                    }
                    // sleep重试
                    Thread.sleep(1000);
                }
            }
        }  catch (Exception e) {
            LOGGER.warn("Read data fail", e);
        } finally {
            // 关闭consumer相关资源
            datahubConsumer.close();
        }

        // 进程退出时,调用全局清理函数
        HttpClient.close();

//        ClientMetrics.stopMetrics();
    }
}

多线程读写示例

  • 多线程共用同一Consumer/Producer消费示例(适用于1.4及以上版本)

    package com.aliyun.datahub.clientlibrary.example;
    
    import com.aliyun.credentials.Client;
    import com.aliyun.datahub.client.common.DatahubConfig;
    import com.aliyun.datahub.client.exception.DatahubClientException;
    import com.aliyun.datahub.client.http.HttpClient;
    import com.aliyun.datahub.client.model.BlobRecordData;
    import com.aliyun.datahub.client.model.RecordEntry;
    import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
    import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
    import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
    import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
    import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class MultiThreadReadWrite {
        private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadReadWrite.class);
    
        public static void main(String[] args) throws Exception {
            try {
    //            testProducer();
                testConsumer();
            } finally {
                // 进程退出时,调用全局清理函数
                HttpClient.close();
            }
        }
    
        private static void testProducer() throws Exception {
            List<RecordEntry> records = new ArrayList<>();
            String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
            Client credentialClient = new Client();
            String accessKeyId = credentialClient.getAccessKeyId();
            String accessKeySecret = credentialClient.getAccessKeySecret();
            String projectName = "";
            String topicName = "";
            for (int i = 0; i < 2; ++i) {
                RecordEntry record = new RecordEntry();
                BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
                record.setRecordData(data);
                record.addAttribute("key1", "value1");
    
                records.add(record);
            }
    
            ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
            config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
    
            Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
    
            DatahubProducer producer = new DatahubProducer(projectName, topicName,config);
            List<Thread> threads = new ArrayList<>();
            for (int i = 0; i < 10; ++i) {
                ProducerThread thread = new ProducerThread(producer, records, shardCountMap);
                thread.start();
                threads.add(thread);
            }
    
            for (int i = 0; i < 10; ++i) {
                threads.get(i).join();
            }
    
            producer.close();
            // print write count
            for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
                LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
            }
        }
    
        private static class ProducerThread extends Thread {
            private final DatahubProducer producer;
            private final List<RecordEntry> records;
            private final Map<String, AtomicInteger> shardCountMap;
    
            public ProducerThread(DatahubProducer producer,
  • 多个Consumer/Producer线程消费示例(适用于低于1.4的版本)

    package com.aliyun.datahub.clientlibrary.example;
    
    import com.aliyun.credentials.Client;
    import com.aliyun.datahub.client.common.DatahubConfig;
    import com.aliyun.datahub.client.exception.DatahubClientException;
    import com.aliyun.datahub.client.http.HttpClient;
    import com.aliyun.datahub.client.model.BlobRecordData;
    import com.aliyun.datahub.client.model.RecordEntry;
    import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
    import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
    import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
    import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
    import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class MultiProducerAndConsumer {
        private static final Logger LOGGER = LoggerFactory.getLogger(MultiProducerAndConsumer.class);
    
        public static void main(String[] args) throws Exception {
            try {
    //            testProducer();
                testConsumer();
            } finally {
                // 进程退出时,调用全局清理函数
                HttpClient.close();
            }
        }
    
        private static void testProducer() throws Exception {
            List<RecordEntry> records = new ArrayList<>();
            for (int i = 0; i < 2; ++i) {
                RecordEntry record = new RecordEntry();
                BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
                record.setRecordData(data);
                record.addAttribute("key1", "value1");
    
                records.add(record);
            }
            String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
            Client credentialClient = new Client();
            String accessKeyId = credentialClient.getAccessKeyId();
            String accessKeySecret = credentialClient.getAccessKeySecret();
            ProducerConfig config = new ProducerConfig(endpoint,accessKeyId ,accessKeySecret);
            config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
    
            Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
    
    
            List<Thread> threads = new ArrayList<>();
            for (int i = 0; i < 3; ++i) {
                ProducerThread thread = new ProducerThread(config, records, shardCountMap);
                thread.start();
                threads.add(thread);
            }
    
            for (int i = 0; i < 3; ++i) {
                threads.get(i).join();
            }
    
            // print write count
            for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
                LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
            }
        }
    
        private static class ProducerThread extends Thread {
            private final DatahubProducer producer;
            private final List<RecordEntry> records;
            private final Map<String, AtomicInteger> shardCountMap;
            String projectName = "";
            String topicName = "";
            public ProducerThread(ProducerConfig config,
                                  List<RecordEntry> records,
                                  Map<String, AtomicInteger> shardCountMap) {
                this.producer = new DatahubProducer(projectName, topicName, config);;
                this.records = records;
                this.shardCountMap = shardCountMap;
            }
    
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 100; ++i) {
                        try {
                            String shardId = producer.send(records);
                            shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                            AtomicInteger cnt = shardCountMap.get(shardId);
                            cnt.incrementAndGet();
                        } catch (DatahubClientException e) {
                            LOGGER.info("Producer send fail", e);
                            if (!ExceptionChecker.isRetryableException(e)) {
                                break;
                            }
                            // sleep重试
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException ex) {
                                // ignore
                            }
                        } catch (Exception e) {
                            LOGGER.info("Producer send fail", e);
                            break;
                        }
                    }
                } finally {
                    producer.close();
                }
            }
        }
    
        private static void testConsumer() throws Exception {
    
            ConsumerConfig config = new ConsumerConfig(ExampleConstants.ENDPOINT, ExampleConstants.ACCESS_ID, ExampleConstants.SUB_ACCESS_KEY);
            config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
    
            Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
            Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
            Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
    
            List<Thread> threads = new ArrayList<>();
            for (int i = 0; i < 3; ++i) {
                ConsumerThread thread = new ConsumerThread(config, firstMap, lastMap, shardCountMap);
                thread.start();
                threads.add(thread);
            }
    
            for (int i = 0; i < 3; ++i) {
                threads.get(i).join();
            }
    
            // print start and end sequence
            for (RecordEntry first : firstMap.values()) {
                RecordEntry last = lastMap.get(first.getShardId());
                AtomicInteger cnt = shardCountMap.get(first.getShardId());
                LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
                        first.getShardId(), first.getSequence(), last.getSequence(), cnt);
            }
        }
    
        private static class ConsumerThread extends Thread {
            private final DatahubConsumer consumer;
            private final Map<String, RecordEntry> firstMap;
            private final Map<String, RecordEntry> lastMap;
            private final Map<String, AtomicInteger> shardCountMap;
    
            public ConsumerThread(ConsumerConfig config,
                                  Map<String, RecordEntry> firstMap,
                                  Map<String, RecordEntry> lastMap,
                                  Map<String, AtomicInteger> shardCountMap) {
                this.consumer = new DatahubConsumer(ExampleConstants.PROJECT_NAME, ExampleConstants.TOPIC_NAME, ExampleConstants.SUB_ID, config);;
                this.lastMap = lastMap;
                this.firstMap = firstMap;
                this.shardCountMap = shardCountMap;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        try {
                            RecordEntry record = consumer.read(30000);
                            if (record == null) {
                                // 在demo中,这里30秒读取不到数据就退出测试
                                break;
                            }
                            String shardId = record.getShardId();
                            firstMap.putIfAbsent(shardId, record);
                            lastMap.put(shardId, record);
                            shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                            AtomicInteger cnt = shardCountMap.get(shardId);
                            cnt.incrementAndGet();
                        } catch (DatahubClientException e) {
                            if (!ExceptionChecker.isRetryableException(e)) {
                                LOGGER.info("Read data fail", e);
                                break;
                            }
                            // sleep重试
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException ex) {
                                // ignore
                            }
                        } catch (Exception e) {
                            LOGGER.warn("Read fail.", e);
                            break;
                        }
                    }
                } finally {
                    consumer.close();
                }
    
            }
        }
    }