协同消费
概念
点位服务
点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。
为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。
在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。
协同消费
协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。
场景
现有3个消费者实例A,B,C,Topic共有10个shard
实例A启动,分配10个shard
实例B,C启动,shard分配为4,3,3
将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3
实例C停止后,shard分配为6,5
心跳
要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。
版本
Maven依赖以及JDK:
maven pom
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.1/version>
</dependency>
jdk
jdk: >= 1.8
datahub-client-library 1.4及以后版本 Producer / Consumer修改为线程安全,可以在多个线程中使用同一个Producer / Consumer,低于1.4版本,非线程安全的consumer / producer 的多线程使用方式请参考下文 "多个Consumer/Producer线程消费示例"章节
身份验证
背景信息
AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。
Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。
前提条件
已获取RAM用户账号的AccessKey ID和AccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息。
- 重要
阿里云账号(即主账号)的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您为RAM用户创建AccessKey,非必要情况下请勿为阿里云主账号创建AccessKey。
RAM用户的AccessKey Secret只能在创建AccessKey时显示,创建完成后不支持查看。请在创建好AccessKey后,及时并妥善保存AccessKey Secret。
已安装阿里云SDK Credentials工具。
Maven安装方式(推荐使用Credentials最新版本):
<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
JDK版本为1.7及以上。
配置方案
本文示例的是通过配置环境变量方式,更多方式请访问配置环境变量
使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。否则,配置文件将不生效。
阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。
配置方法
配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。
Linux和macOS系统配置方法
执行以下命令:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
<access_key_id>
需替换为已准备好的AccessKey ID,<access_key_secret>
替换为AccessKey Secret。Windows系统配置方法
新建环境变量文件,添加环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
,并写入已准备好的AccessKey ID和AccessKey Secret。重启Windows系统。
代码示例
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
示例
Producer 代码示例
同步写入
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
public static void main(String[] args) throws Exception {
// 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region为例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = datahubProducer.getTopicSchema();
///////////////////// STEP2. 根据Topic是BLOB还是TUPLE类型,选择构建写入Record ////////////
List<RecordEntry> recordList = new ArrayList<>();
// 构建BLOB非结构化数据写入
for (int i = 0; i < 10; ++i) {
RecordEntry record = new RecordEntry();
// 构建BLOB数据
BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
// 构建TUPLE数据
//TupleRecordData data = new TupleRecordData(schema);
//data.setField("f1", "f1_" + i);
record.setRecordData(data);
record.addAttribute("key1", "value1"); // 数据字段,可选
recordList.add(record);
}
///////////////////////// STEP3:循环写入数据 /////////////////////////
try {
for (int i = 0; i < 10000; ++i) {
try {
String shardId = datahubProducer.send(recordList);
LOGGER.info("Write shard {} ok, record count:{}", shardId, recordList.size());
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重试
Thread.sleep(1000);
}
}
} finally {
// 关闭producer相关资源
datahubProducer.close();
}
// 进程退出时,调用全局清理函数
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
异步写入
public class SimpleProducerAsync {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducerAsync.class);
public static void main(String[] args) throws Exception {
// 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region为例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
////////////////////////////// STEP1. 创建DatahubProducer //////////////////////////
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
config.setMaxAsyncBufferTimeMs(30000); // 设置缓存时间
DatahubProducer datahubProducer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = datahubProducer.getTopicSchema();
// 异步写入可以注册回调函数
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records,
long elapsedTimeMs, long sendTimeMs) {
LOGGER.info("Message sent successfully");
}
@Override
public void onFailure(String shardId, List<RecordEntry> records,
long elapsedTimeMs, DatahubClientException e) {
LOGGER.error("Message sent fail", e);
}
};
// 可选,配置数据哈希策略
// partition优先顺序: 依次按照RecordEntry的shardId, hashKey, partitionKey的顺序计算最终写入的shardId
RecordPartitioner partitioner = new DefaultRecordPartitioner();
///////////////////////// STEP2:异步循环写入数据 /////////////////////////
try {
for (int i = 0; i < 1000; ++i) {
try {
//Tuple结构化数据写入
RecordEntry record = new RecordEntry();
TupleRecordData data = new TupleRecordData(schema);
data.setField("f1", "f1_" + i);
//BLOB非结构化数据写入
//BlobRecordData data = new BlobRecordData("HelloWorld".getBytes(StandardCharsets.UTF_8));
record.setRecordData(data);
record.addAttribute("key1", "value1"); // 数据字段,可选
// 单条发送,发送数据时可以指定是否进行partition,
datahubProducer.sendAsync(record, callback, partitioner);
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重试
Thread.sleep(1000);
}
}
// 阻塞到数据发送成功
datahubProducer.flush(true);
} catch (Exception e) {
LOGGER.warn("Write fail", e);
} finally {
// 关闭producer相关资源
datahubProducer.close();
}
// 进程退出时,调用全局清理函数
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
协同消费代码示例
配置
名称 | 描述 |
autoCommit | 是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。 |
offsetCommitTimeoutMs | 点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000] |
sessionTimeoutMs | 会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000] |
fetchSize | 单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0 |
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.*;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);
public static void main(String[] args) throws Exception {
// 是否开启内存Metric,开启后,配置log4j日志后,内存metric会打印到日志中
// ClientMetrics.startMetrics();
//以杭州Region为例
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
String subId = "";
////////////////////////////// STEP1. 创建DatahubConsumer //////////////////////////
ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId, accessKeySecret);
DatahubConsumer datahubConsumer = new DatahubConsumer(projectName, topicName, subId, config);
///////////////////////// STEP2:循环读取数据 /////////////////////////
try {
while (true) {
try {
RecordEntry record = datahubConsumer.read(3000);
if (record == null) {
continue; // 3s内未读取到数据,(1). 无数据 (2). 内部状态未Ready,比如协同消费暂时未分配到shard
}
RecordData recordData = record.getRecordData();
// 根据Topic为BLOB类型还是TUPLE类型进行不同的数据处理逻辑, 一种topic只有一种类型
if (recordData instanceof TupleRecordData) {
TupleRecordData data = (TupleRecordData) recordData;
RecordSchema schema = data.getRecordSchema();
// 示例中仅做简单的字符串拼接
StringBuilder sb = new StringBuilder();
for (int i = 0; i < schema.getFields().size(); ++i) {
sb.append(data.getField(i)).append(",");
}
LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
record.getSegmentSizeForBatch(), sb);
} else {
BlobRecordData data = (BlobRecordData) recordData;
LOGGER.debug("Read record. shardId:{}, seq:{}, ts:{}, batchIndex:{}, batchSize:{}, data:{}",
record.getShardId(), record.getSequence(), record.getSystemTime(), record.getSegmentIndexForBatch(),
record.getSegmentSizeForBatch(), new String(data.getData()));
}
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重试
Thread.sleep(1000);
}
}
} catch (Exception e) {
LOGGER.warn("Read data fail", e);
} finally {
// 关闭consumer相关资源
datahubConsumer.close();
}
// 进程退出时,调用全局清理函数
HttpClient.close();
// ClientMetrics.stopMetrics();
}
}
多线程读写示例
多线程共用同一Consumer/Producer消费示例(适用于1.4及以上版本)
package com.aliyun.datahub.clientlibrary.example;
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadReadWrite {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadReadWrite.class);
public static void main(String[] args) throws Exception {
try {
// testProducer();
testConsumer();
} finally {
// 进程退出时,调用全局清理函数
HttpClient.close();
}
}
private static void testProducer() throws Exception {
List<RecordEntry> records = new ArrayList<>();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
for (int i = 0; i < 2; ++i) {
RecordEntry record = new RecordEntry();
BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
record.setRecordData(data);
record.addAttribute("key1", "value1");
records.add(record);
}
ProducerConfig config = new ProducerConfig(endpoint, accessKeyId, accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
DatahubProducer producer = new DatahubProducer(projectName, topicName,config);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
ProducerThread thread = new ProducerThread(producer, records, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 10; ++i) {
threads.get(i).join();
}
producer.close();
// print write count
for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
}
}
private static class ProducerThread extends Thread {
private final DatahubProducer producer;
private final List<RecordEntry> records;
private final Map<String, AtomicInteger> shardCountMap;
public ProducerThread(DatahubProducer producer,
List<RecordEntry> records,
Map<String, AtomicInteger> shardCountMap) {
this.producer = producer;
this.records = records;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
String shardId = producer.send(records);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重试
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
break;
}
}
}
}
private static void testConsumer() throws Exception {
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
String projectName = "";
String topicName = "";
String subId = "";
ConsumerConfig config = new ConsumerConfig(endpoint, accessKeyId,accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
ConsumerThread thread = new ConsumerThread(consumer, firstMap, lastMap, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 10; ++i) {
threads.get(i).join();
}
// print start and end sequence
for (RecordEntry first : firstMap.values()) {
RecordEntry last = lastMap.get(first.getShardId());
AtomicInteger cnt = shardCountMap.get(first.getShardId());
LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
first.getShardId(), first.getSequence(), last.getSequence(), cnt);
}
// 关闭consumer相关资源
consumer.close();
}
private static class ConsumerThread extends Thread {
private final DatahubConsumer consumer;
private final Map<String, RecordEntry> firstMap;
private final Map<String, RecordEntry> lastMap;
private final Map<String, AtomicInteger> shardCountMap;
public ConsumerThread(DatahubConsumer consumer,
Map<String, RecordEntry> firstMap,
Map<String, RecordEntry> lastMap,
Map<String, AtomicInteger> shardCountMap) {
this.consumer = consumer;
this.lastMap = lastMap;
this.firstMap = firstMap;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
while (true) {
try {
RecordEntry record = consumer.read(30000);
if (record == null) {
// 在demo中,这里30秒读取不到数据就退出测试
break;
}
String shardId = record.getShardId();
firstMap.putIfAbsent(shardId, record);
lastMap.put(shardId, record);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重试
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.warn("Read fail.", e);
break;
}
}
}
}
}
多个Consumer/Producer线程消费示例(适用于低于1.4的版本)
package com.aliyun.datahub.clientlibrary.example;
import com.aliyun.credentials.Client;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpClient;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.common.ExceptionChecker;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.consumer.DatahubConsumer;
import com.aliyun.datahub.clientlibrary.producer.DatahubProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiProducerAndConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiProducerAndConsumer.class);
public static void main(String[] args) throws Exception {
try {
// testProducer();
testConsumer();
} finally {
// 进程退出时,调用全局清理函数
HttpClient.close();
}
}
private static void testProducer() throws Exception {
List<RecordEntry> records = new ArrayList<>();
for (int i = 0; i < 2; ++i) {
RecordEntry record = new RecordEntry();
BlobRecordData data = new BlobRecordData(("HelloWorld-" + i).getBytes());
record.setRecordData(data);
record.addAttribute("key1", "value1");
records.add(record);
}
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
ProducerConfig config = new ProducerConfig(endpoint,accessKeyId ,accessKeySecret);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
ProducerThread thread = new ProducerThread(config, records, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 3; ++i) {
threads.get(i).join();
}
// print write count
for (Map.Entry<String, AtomicInteger> entry : shardCountMap.entrySet()) {
LOGGER.info("ShardId:{}, count:{}", entry.getKey(), entry.getValue());
}
}
private static class ProducerThread extends Thread {
private final DatahubProducer producer;
private final List<RecordEntry> records;
private final Map<String, AtomicInteger> shardCountMap;
String projectName = "";
String topicName = "";
public ProducerThread(ProducerConfig config,
List<RecordEntry> records,
Map<String, AtomicInteger> shardCountMap) {
this.producer = new DatahubProducer(projectName, topicName, config);;
this.records = records;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; ++i) {
try {
String shardId = producer.send(records);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
LOGGER.info("Producer send fail", e);
if (!ExceptionChecker.isRetryableException(e)) {
break;
}
// sleep重试
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.info("Producer send fail", e);
break;
}
}
} finally {
producer.close();
}
}
}
private static void testConsumer() throws Exception {
ConsumerConfig config = new ConsumerConfig(ExampleConstants.ENDPOINT, ExampleConstants.ACCESS_ID, ExampleConstants.SUB_ACCESS_KEY);
config.getDatahubConfig().setProtocol(DatahubConfig.Protocol.BATCH); // 是否开启batch写入,建议开启
Map<String, RecordEntry> firstMap = new ConcurrentHashMap<>();
Map<String, RecordEntry> lastMap = new ConcurrentHashMap<>();
Map<String, AtomicInteger> shardCountMap = new ConcurrentHashMap<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
ConsumerThread thread = new ConsumerThread(config, firstMap, lastMap, shardCountMap);
thread.start();
threads.add(thread);
}
for (int i = 0; i < 3; ++i) {
threads.get(i).join();
}
// print start and end sequence
for (RecordEntry first : firstMap.values()) {
RecordEntry last = lastMap.get(first.getShardId());
AtomicInteger cnt = shardCountMap.get(first.getShardId());
LOGGER.info("ShardId:{}, startSeq:{}, endSeq:{}, cnt:{}",
first.getShardId(), first.getSequence(), last.getSequence(), cnt);
}
}
private static class ConsumerThread extends Thread {
private final DatahubConsumer consumer;
private final Map<String, RecordEntry> firstMap;
private final Map<String, RecordEntry> lastMap;
private final Map<String, AtomicInteger> shardCountMap;
public ConsumerThread(ConsumerConfig config,
Map<String, RecordEntry> firstMap,
Map<String, RecordEntry> lastMap,
Map<String, AtomicInteger> shardCountMap) {
this.consumer = new DatahubConsumer(ExampleConstants.PROJECT_NAME, ExampleConstants.TOPIC_NAME, ExampleConstants.SUB_ID, config);;
this.lastMap = lastMap;
this.firstMap = firstMap;
this.shardCountMap = shardCountMap;
}
@Override
public void run() {
try {
while (true) {
try {
RecordEntry record = consumer.read(30000);
if (record == null) {
// 在demo中,这里30秒读取不到数据就退出测试
break;
}
String shardId = record.getShardId();
firstMap.putIfAbsent(shardId, record);
lastMap.put(shardId, record);
shardCountMap.putIfAbsent(shardId, new AtomicInteger(0));
AtomicInteger cnt = shardCountMap.get(shardId);
cnt.incrementAndGet();
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Read data fail", e);
break;
}
// sleep重试
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} catch (Exception e) {
LOGGER.warn("Read fail.", e);
break;
}
}
} finally {
consumer.close();
}
}
}
}