读写数据
用户可以调用SDK方法进行读写数据状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。
同时用户可以引入datahub-client-library依赖,datahub-client-library是在Java-SDK读写功能的封装,用户可以使用Producer实现均匀写入shard,也可以使用Consumer实现协同消费,(建议使用)
读数据
读取数据有两种方式,
使用SDK
使用协同消费
使用SDK
步骤一:获取cursor
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
LATEST : 表示获取的cursor指向当前最新的record。
SEQUENCE : 表示获取的cursor指向该序列的record。
SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。
说明GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
CursorType Which type used to get cursor.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SeekOutOfRangeException
示例代码
public static void getcursor() { String shardId = "5"; try { /* OLDEST用法示例 */ String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor(); /* LATEST用法示例 */ String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor(); /* SEQUENCE用法示例 */ //获取最新数据的sequence long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence(); //获取最新的十条数据的读取位置 String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor(); /* SYSTEM_TIME用法示例 */ //将时间转为时间戳形式 String time = "2019-07-01 10:00:00"; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timestamp = 0L; try { Date date = simpleDateFormat.parse(time); timestamp = date.getTime();//获取时间的时间戳 //System.out.println(timestamp); } catch (ParseException e) { System.exit(-1); } //获取时间time之后的数据读取位置 String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor(); System.out.println("get cursor successful"); } catch (InvalidParameterException e) { System.out.println("invalid parameter, please check your parameter"); System.exit(1); } catch (AuthorizationFailureException e) { System.out.println("AK error, please check your accessId and accessKey"); System.exit(1); } catch (ResourceNotFoundException e) { System.out.println("project or topic or shard not found"); System.exit(1); } catch (SeekOutOfRangeException e) { System.out.println("offset invalid or has expired"); System.exit(1); } catch (DatahubClientException e) { System.out.println("other error"); System.out.println(e); System.exit(1); } }
步骤二:读取数据接口:
GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
schema If you read TUPLE records, you need this parameter.
cursor The start cursor used to read data.
limit Max record size to read.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ShardSealedException
LimitExceededException
示例
1). 读取Tuple topic数据
public static void example() {
//每次最多读取数据量
int recordLimit = 1000;
String shardId = "7";
// 获取cursor, 这里获取有效数据中时间最久远的record游标
// 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
String cursor = "";
try {
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid or has expired");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// 拿到下一个游标
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游标或游标已过期,建议重新定位后开始消费
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard is closed, all data has been read");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2). 读取Blob topic数据
public static void example() {
//每次最多读取数据量
int recordLimit = 1000;
String shardId = "7";
// 获取cursor, 这里获取有效数据中时间最久远的record游标
// 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
String cursor = "";
try {
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid or has expired");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(10000);
continue;
}
/* 消费数据 */
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// 拿到下一个游标
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游标或游标已过期,建议重新定位后开始消费
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (SeekOutOfRangeException e) {
System.out.println("offset invalid");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard is closed, all data has been read");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
使用协同消费
步骤一:初始化Consumer
配置
名称 | 描述 |
autoCommit | 是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。 |
offsetCommitTimeoutMs | 点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000] |
sessionTimeoutMs | 会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000] |
fetchSize | 单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0 |
您还需要在工程中配置相应的Access Key和Secret Key,推荐使用环境变量的形式在配置文件中配置。
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
{
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubscriptionId>";
/**
* 1.使用协同消费,使用点位服务
*使用协同消费:
如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,不需要手动指定哪台机器消费哪几个shard,
服务器端会自动分配,当有第三台机器加入了后也会做balance
*
* 使用点位服务:
消费的时候,会根据服务端subid的点位来读(如果是新建的订阅,还没有点位就从头读),如果要指定从哪个时间点开始读,可以在页面上重置subid的点位
*
* */
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, config);
/**
* 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
*不使用协同消费:
如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,那需要手动指定哪台机器消费哪几个shard,
(客户端机器A,消费0,1,2号shard;客户端机器B,消费3,4号shard)当有第三台机器加入了后需要自己重新指定消费策略
*
* */
//客户端A消费shardid为0,1,2的示例
List<String> shardlists = Arrays.asList("0", "1", "2");
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
//客户端B消费shardid为3,4的示例
// List<String> shardlists = Arrays.asList("3", "4");
// ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
// Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
/**
*3.不使用协同消费,不使用点位服务
*不使用点位服务:
就是自己需要找个存储(db/redis等)来记录自己哪个shard消费到什么时间/Sequence,每次读的时候都要根据自己记录的点位来初始化
*
* */
Map<String, Offset> offsetMap = new HashMap<>();
// 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
offsetMap.put("0", new Offset(100, 1548573440756L));
// 只提供sequence,按照sequence获取Cursor
offsetMap.put("1", new Offset().setSequence(1));
// 只提供timestamp,按照timestamp获取Cursor
offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
}
步骤二:协同代码示例
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class DatahubReader {
private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);
private static void sleep(long milliSeconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliSeconds);
} catch (InterruptedException e) {
// TODO:自行处理异常
}
}
public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
{
return new Consumer(project, topic, subId, config);
}
public static void main(String[] args) {
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubscriptionId>";
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = createConsumer(config, projectName, topicName, subId);
int maxRetry = 3;
boolean stop = false;
try {
while (!stop) {
try {
while (true) {
// 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
// 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
RecordEntry record = consumer.read(maxRetry);
// 处理数据
if (record != null) {
TupleRecordData data = (TupleRecordData) record.getRecordData();
// 根据自己的schema来处理数据,此处打印第一列的内容
LOG.info("field1: {}", data.getField(0));
// 根据列名取数据
// LOG.info("field2: {}", data.getField("field2"));
// 非自动提交模式,每条record处理完后都需要ack
// 自动提交模式,ack不会做任何操作
// 1.1.7版本及以上
record.getKey().ack();
} else {
LOG.info("read null");
}
}
} catch (SubscriptionOffsetResetException e) {
// 点位被重置,重新初始化consumer
try {
consumer.close();
consumer = createConsumer(config, projectName, topicName, subId);
} catch (DatahubClientException e1) {
// 初始化失败,重试或直接抛异常
LOG.error("create consumer failed", e);
throw e;
}
} catch (InvalidParameterException |
SubscriptionOfflineException |
SubscriptionSessionInvalidException |
AuthorizationFailureException |
NoPermissionException e) {
// 请求参数非法
// 订阅被下线
// 订阅下相同shard被其他客户端占用
// 签名不正确
// 没有权限
LOG.error("read failed", e);
throw e;
} catch (DatahubClientException e) {
// 基类异常,包含网络问题等,可以选择重试
LOG.error("read failed, retry", e);
sleep(1000);
}
}
} catch (Throwable e) {
LOG.error("read failed", e);
} finally {
// 确保资源正确释放
// 会提交已ack的点位
consumer.close();
}
}
}
写数据
使用SDK
服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShard
和putRecords
两个接口。针对第二个接口,用户需要判断PutRecordsResult
结果以确认数据是否写入成功;而putRecordsByShard
接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard
接口。
PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
records Records list to written.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ShardSealedException
LimitExceededException
1). 写入Tuple topic
// 写入Tuple型数据
public static void tupleExample() {
String shardId = "9";
// 获取schema
recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard status is CLOSED, can not write");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
}
2). 写入Blob topic
// 写入blob型数据
public static void blobExample() {
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
break;
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard status is CLOSED, can not write");
System.exit(1);
} catch (LimitExceededException e) {
System.out.println("maybe qps exceed limit, retry");
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
}
}
使用Producer
步骤一:引入依赖
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.1.12-public</version>
</dependency>
步骤二:代码示例
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class DatahubWriter {
private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);
private static void sleep(long milliSeconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliSeconds);
} catch (InterruptedException e) {
// TODO:自行处理异常
}
}
private static List<RecordEntry> genRecords(RecordSchema schema) {
List<RecordEntry> recordEntries = new ArrayList<>();
for (int cnt = 0; cnt < 10; ++cnt) {
RecordEntry entry = new RecordEntry();
entry.addAttribute("key1", "value1");
entry.addAttribute("key2", "value2");
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "testValue");
data.setField("field2", 1);
entry.setRecordData(data);
recordEntries.add(entry);
}
return recordEntries;
}
private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
int maxRetry = 3;
while (true) {
try {
// 自动选择shard写入
producer.send(recordEntries, maxRetry);
// 指定写入shard "0"
// producer.send(recordEntries, "0", maxRetry);
LOG.error("send records: {}", recordEntries.size());
break;
} catch (MalformedRecordException e) {
// record 格式非法,根据业务场景选择忽略或直接抛异常
LOG.error("write fail", e);
throw e;
} catch (InvalidParameterException |
AuthorizationFailureException |
NoPermissionException e) {
// 请求参数非法
// 签名不正确
// 没有权限
LOG.error("write fail", e);
throw e;
} catch (ShardNotFoundException e) {
// shard 不存在, 如果不是写入自己指定的shard,可以不用处理
LOG.error("write fail", e);
sleep(1000);
} catch (ResourceNotFoundException e) {
// project, topic 或 shard 不存在
LOG.error("write fail", e);
throw e;
} catch (DatahubClientException e) {
// 基类异常,包含网络问题等,可以选择重试
LOG.error("write fail", e);
sleep(1000);
}
}
}
public static void main(String[] args) {
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
Producer producer = new Producer(projectName, topicName, config);
// 根据场景控制循环
boolean stop = false;
try {
while (!stop) {
List<RecordEntry> recordEntries = genRecords(schema);
sendRecords(producer, recordEntries);
}
} finally {
// 确保资源正确释放
producer.close();
}
}
}
多方式写入
在DataHub 2.12之前版本,DataHub仅支持putRecords
接口,在RecordEntry
类中包含shardId
、partitionKey
和hashKey
三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中注意:开启Shard extend模式无法按Hashkey和PartitionKey方式写入
2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗
1). 按照ShardID写入推荐方式,使用示例如下:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");
2). 按HashKey写入指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的Shard操作决定数据写入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
2). 按PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的Shard操作决定写入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");
注意事项
Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。