协同消费

协同消费

概念

点位服务

点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。

协同消费示意图

场景

现有3个消费者实例A,B,C,Topic共有10个shard

  1. 实例A启动,分配10个shard

  2. 实例B,C启动,shard分配为4,3,3

  3. 将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3

  4. 实例C停止后,shard分配为6,5

心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

版本

Maven依赖以及JDK:

maven pom

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.25.1</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.4.1/version>
</dependency>

jdk

jdk: >= 1.8
说明

datahub-client-library 1.4及以后版本 Producer / Consumer修改为线程安全,可以在多个线程中使用同一个Producer / Consumer,低于1.4版本,非线程安全的consumer / producer 的多线程使用方式请参考下文 "多个Consumer/Producer线程消费示例"章节

身份验证

背景信息

AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。

Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。

前提条件

  • 已获取RAM用户账号的AccessKey ID和AccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息

  • 重要

    阿里云账号(即主账号)的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您为RAM用户创建AccessKey,非必要情况下请勿为阿里云主账号创建AccessKey。

    RAM用户的AccessKey Secret只能在创建AccessKey时显示,创建完成后不支持查看。请在创建好AccessKey后,及时并妥善保存AccessKey Secret。

  • 已安装阿里云SDK Credentials工具。

    Maven安装方式(推荐使用Credentials最新版本):

    <dependency>
     <groupId>com.aliyun</groupId>
     <artifactId>credentials-java</artifactId>
     <version>0.2.11</version>
    </dependency>
  • JDK版本为1.7及以上。

配置方案

本文示例的是通过配置环境变量方式,更多方式请访问配置环境变量

重要

使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否则,配置文件将不生效。

阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。

配置方法

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

  • Linux和macOS系统配置方法

    执行以下命令:

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

    <access_key_id>需替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。

  • Windows系统配置方法

    1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey ID和AccessKey Secret。

    2. 重启Windows系统。

代码示例

    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();

示例

Producer 代码示例

同步写入

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);

  public static void main(String[] args) throws Exception {
    // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
    // ClientMetrics.startMetrics();

    //以杭州Region为例
    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();
    String projectName = "";
    String topicName = "";

    ////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
    ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
    DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
    RecordSchema schema = datahubProducer.getTopicSchema();
    ///////////////////// STEP2. 根据Topic是BLOB还是TUPLE类型,选择构建写入Record ////////////
    List<RecordEntry> recordList = new ArrayList<>();
    // 构建BLOB非结构化数据写入
    for (int i = 0; i < 10; ++i) {
      RecordEntry record = new RecordEntry();
      // 构建BLOB数据
      BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
      // 构建TUPLE数据
     //TupleRecordData data = new TupleRecordData(schema);
     //data.setField("f1", "f1_" + i);

      record.setRecordData(data);
      record.addAttribute("key1", "value1"); // 数据字段,可选

      recordList.add(record);
    }

    ///////////////////////// STEP3:循环写入数据 /////////////////////////
    try {
      for (int i = 0; i < 10000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} ok, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重试
          Thread.sleep(1000);
        }
      }
    } finally {
      // 关闭producer相关资源
      datahubProducer.close();
    }

    // 进程退出时,调用全局清理函数
    HttpClient.close();

//        ClientMetrics.stopMetrics();
  }
}

异步写入

public class SimpleProducerAsync {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducerAsync.class);

    public static void main(String[] args) throws Exception {
        // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
        // ClientMetrics.startMetrics();
        //以杭州Region为例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";

        ////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
        ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
        config.setMaxAsyncBufferTimeMs(30000); // 设置缓存时间
        DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);

        RecordSchema schema = datahubProducer.getTopicSchema();

        // 异步写入可以注册回调函数
        WriteCallback callback  = new WriteCallback() {
            @Override
            public void onSuccess(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, long sendTimeMs) {
                LOGGER.info("Message sent successfully");
            }

            @Override
            public void onFailure(String shardId, List<RecordEntry> records,
                                  long elapsedTimeMs, DatahubClientException e) {
                LOGGER.error("Message sent fail", e);
            }
        };

        // 可选,配置数据哈希策略
        // partition优先顺序: 依次按照RecordEntry的shardId, hashKey, partitionKey的顺序计算最终写入的shardId
        RecordPartitioner partitioner = new DefaultRecordPartitioner();

        ///////////////////////// STEP2:异步循环写入数据 /////////////////////////
        try {
            for (int i = 0; i < 1000; ++i) {
                try {
					//Tuple结构化数据写入
                    RecordEntry record = new RecordEntry();
                    TupleRecordData data = new TupleRecordData(schema);
                    data.setField("f1", "f1_" + i);
					          //BLOB非结构化数据写入
					          //BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
                    record.setRecordData(data);
                    record.addAttribute("key1", "value1"); // 数据字段,可选
                    // 单条发送,发送数据时可以指定是否进行partition,
                    datahubProducer.sendAsync(record, callback, partitioner);
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Write data fail", e);
                        break;
                    }
                    // sleep重试
                    Thread.sleep(1000);
                }
            }

            // 阻塞到数据发送成功
            datahubProducer.flush(true);
        } catch (Exception e) {
            LOGGER.warn("Write fail", e);
        } finally {
            // 关闭producer相关资源
            datahubProducer.close();
        }

        // 进程退出时,调用全局清理函数
        HttpClient.close();
//        ClientMetrics.stopMetrics();
    }
}

协同消费代码示例

配置

名称

描述

autoCommit

是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。

offsetCommitTimeoutMs

点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]

sessionTimeoutMs

会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]

fetchSize

单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.*;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);

    public static void main(String[] args) throws Exception {
        // 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
        // ClientMetrics.startMetrics();


        //以杭州Region为例
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        String subId = "";

        ////////////////////////////// STEP1. 创建DatahubConsumer //////////////////////////
        ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId, accessKeySecret);
        DatahubConsumer datahubConsumer = new DatahubConsumer(projectName, topicName, subId, config);
        ///////////////////////// STEP2:循环读取数据 /////////////////////////
        try {
            while (true) {
                try {
                    RecordEntry record = datahubConsumer.read(3000);
                    if (record == null) {
                        continue; // 3s内未读取到数据,(1). 无数据 (2). 内部状态未Ready,比如协同消费暂时未分配到shard
                    }

                    RecordData recordData = record.getRecordData();
                    // 根据Topic为BLOB类型还是TUPLE类型进行不同的数据处理逻辑, 一种topic只有一种类型
                    if (recordData instanceof TupleRecordData) {
                        TupleRecordData data = (TupleRecordData) recordData;
                        RecordSchema schema = data.getRecordSchema();
                        // 示例中仅做简单的字符串拼接
                        StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < schema.getFields().size(); ++i) {
                            sb.append(data.getField(i)).append(",");
                        }
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), sb);
                    } else {
                        BlobRecordData data = (BlobRecordData) recordData;
                        LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
                                record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
                                record.getSegmentSizeForBatch(), new String(data.getData()));
                    }
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Read data fail", e);
                        break;
                    }
                    // sleep重试
                    Thread.sleep(1000);
                }
            }
        }  catch (Exception e) {
            LOGGER.warn("Read data fail", e);
        } finally {
            // 关闭consumer相关资源
            datahubConsumer.close();
        }

        // 进程退出时,调用全局清理函数
        HttpClient.close();

//        ClientMetrics.stopMetrics();
    }
}

多线程读写示例

多线程共用同一Consumer/Producer消费示例(适用于1.4及以上版本)

package com.aliyun.datahub.clientlibrary.example;

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadReadWrite {
    private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadReadWrite.class);

    public static void main(String[] args) throws Exception {
        try {
//            testProducer();
            testConsumer();
        } finally {
            // 进程退出时,调用全局清理函数
            HttpClient.close();
        }
    }

    private static void testProducer() throws Exception {
        List<RecordEntry> records = new ArrayList<>();
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        for (int i = 0; i < 2; ++i) {
            RecordEntry record = new RecordEntry();
            BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
            record.setRecordData(data);
            record.addAttribute("key1", "value1");

            records.add(record);
        }

        ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启

        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        DatahubProducer producer = new DatahubProducer(projectName, topicName,config);
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            ProducerThread thread = new ProducerThread(producer, records, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 10; ++i) {
            threads.get(i).join();
        }

        producer.close();
        // print write count
        for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
            LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
        }
    }

    private static class ProducerThread extends Thread {
        private final DatahubProducer producer;
        private final List<RecordEntry> records;
        private final Map<String, AtomicInteger> shardCountMap;

        public ProducerThread(DatahubProducer producer,
                              List<RecordEntry> records,
                              Map<String, AtomicInteger> shardCountMap) {
            this.producer = producer;
            this.records = records;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    String shardId = producer.send(records);
                    shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                    AtomicInteger cnt = shardCountMap.get(shardId);
                    cnt.incrementAndGet();
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Write data fail", e);
                        break;
                    }
                    // sleep重试
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                } catch (Exception e) {
                    break;
                }
            }
        }
    }

    private static void testConsumer() throws Exception {
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        String projectName = "";
        String topicName = "";
        String subId = "";
        ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId,accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启

        Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
        Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            ConsumerThread thread = new ConsumerThread(consumer, firstMap, lastMap, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 10; ++i) {
            threads.get(i).join();
        }

        // print start and end sequence
        for (RecordEntry first : firstMap.values()) {
            RecordEntry last = lastMap.get(first.getShardId());
            AtomicInteger cnt = shardCountMap.get(first.getShardId());
            LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
                    first.getShardId(), first.getSequence(), last.getSequence(), cnt);
        }

        // 关闭consumer相关资源
        consumer.close();
    }

    private static class ConsumerThread extends Thread {
        private final DatahubConsumer consumer;
        private final Map<String, RecordEntry> firstMap;
        private final Map<String, RecordEntry> lastMap;
        private final Map<String, AtomicInteger> shardCountMap;

        public ConsumerThread(DatahubConsumer consumer,
                              Map<String, RecordEntry> firstMap,
                              Map<String, RecordEntry> lastMap,
                              Map<String, AtomicInteger> shardCountMap) {
            this.consumer = consumer;
            this.lastMap = lastMap;
            this.firstMap = firstMap;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    RecordEntry record = consumer.read(30000);
                    if (record == null) {
                        // 在demo中,这里30秒读取不到数据就退出测试
                        break;
                    }
                    String shardId = record.getShardId();
                    firstMap.putIfAbsent(shardId, record);
                    lastMap.put(shardId, record);
                    shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                    AtomicInteger cnt = shardCountMap.get(shardId);
                    cnt.incrementAndGet();
                } catch (DatahubClientException e) {
                    if (!ExceptionChecker.isRetryableException(e)) {
                        LOGGER.info("Read data fail", e);
                        break;
                    }
                    // sleep重试
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                } catch (Exception e) {
                    LOGGER.warn("Read fail.", e);
                    break;
                }
            }
        }
    }
}

多个Consumer/Producer线程消费示例(适用于低于1.4的版本)

package com.aliyun.datahub.clientlibrary.example;

import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiProducerAndConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MultiProducerAndConsumer.class);

    public static void main(String[] args) throws Exception {
        try {
//            testProducer();
            testConsumer();
        } finally {
            // 进程退出时,调用全局清理函数
            HttpClient.close();
        }
    }

    private static void testProducer() throws Exception {
        List<RecordEntry> records = new ArrayList<>();
        for (int i = 0; i < 2; ++i) {
            RecordEntry record = new RecordEntry();
            BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
            record.setRecordData(data);
            record.addAttribute("key1", "value1");

            records.add(record);
        }
        String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
        Client credentialClient = new Client();
        String accessKeyId = credentialClient.getAccessKeyId();
        String accessKeySecret = credentialClient.getAccessKeySecret();
        ProducerConfig config = new ProducerConfig(endpoint,accessKeyId ,accessKeySecret);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启

        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();


        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 3; ++i) {
            ProducerThread thread = new ProducerThread(config, records, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 3; ++i) {
            threads.get(i).join();
        }

        // print write count
        for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
            LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
        }
    }

    private static class ProducerThread extends Thread {
        private final DatahubProducer producer;
        private final List<RecordEntry> records;
        private final Map<String, AtomicInteger> shardCountMap;
        String projectName = "";
        String topicName = "";
        public ProducerThread(ProducerConfig config,
                              List<RecordEntry> records,
                              Map<String, AtomicInteger> shardCountMap) {
            this.producer = new DatahubProducer(projectName, topicName, config);;
            this.records = records;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; ++i) {
                    try {
                        String shardId = producer.send(records);
                        shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                        AtomicInteger cnt = shardCountMap.get(shardId);
                        cnt.incrementAndGet();
                    } catch (DatahubClientException e) {
                        LOGGER.info("Producer send fail", e);
                        if (!ExceptionChecker.isRetryableException(e)) {
                            break;
                        }
                        // sleep重试
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException ex) {
                            // ignore
                        }
                    } catch (Exception e) {
                        LOGGER.info("Producer send fail", e);
                        break;
                    }
                }
            } finally {
                producer.close();
            }
        }
    }

    private static void testConsumer() throws Exception {

        ConsumerConfig config = new ConsumerConfig(ExampleConstants.ENDPOINT, ExampleConstants.ACCESS_ID, ExampleConstants.SUB_ACCESS_KEY);
        config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启

        Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
        Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
        Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 3; ++i) {
            ConsumerThread thread = new ConsumerThread(config, firstMap, lastMap, shardCountMap);
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < 3; ++i) {
            threads.get(i).join();
        }

        // print start and end sequence
        for (RecordEntry first : firstMap.values()) {
            RecordEntry last = lastMap.get(first.getShardId());
            AtomicInteger cnt = shardCountMap.get(first.getShardId());
            LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
                    first.getShardId(), first.getSequence(), last.getSequence(), cnt);
        }
    }

    private static class ConsumerThread extends Thread {
        private final DatahubConsumer consumer;
        private final Map<String, RecordEntry> firstMap;
        private final Map<String, RecordEntry> lastMap;
        private final Map<String, AtomicInteger> shardCountMap;

        public ConsumerThread(ConsumerConfig config,
                              Map<String, RecordEntry> firstMap,
                              Map<String, RecordEntry> lastMap,
                              Map<String, AtomicInteger> shardCountMap) {
            this.consumer = new DatahubConsumer(ExampleConstants.PROJECT_NAME, ExampleConstants.TOPIC_NAME, ExampleConstants.SUB_ID, config);;
            this.lastMap = lastMap;
            this.firstMap = firstMap;
            this.shardCountMap = shardCountMap;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    try {
                        RecordEntry record = consumer.read(30000);
                        if (record == null) {
                            // 在demo中,这里30秒读取不到数据就退出测试
                            break;
                        }
                        String shardId = record.getShardId();
                        firstMap.putIfAbsent(shardId, record);
                        lastMap.put(shardId, record);
                        shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
                        AtomicInteger cnt = shardCountMap.get(shardId);
                        cnt.incrementAndGet();
                    } catch (DatahubClientException e) {
                        if (!ExceptionChecker.isRetryableException(e)) {
                            LOGGER.info("Read data fail", e);
                            break;
                        }
                        // sleep重试
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException ex) {
                            // ignore
                        }
                    } catch (Exception e) {
                        LOGGER.warn("Read fail.", e);
                        break;
                    }
                }
            } finally {
                consumer.close();
            }

        }
    }
}