本文为您介绍使用JavaSDK时应注意的内容以及异常参照。
初始化
用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。以下代码用于使用DataHub域名列表新建DataHubClient:
sdk 2.25.1及以上版本(推荐)
//使用新的Batch传输协议创建DataHubClient实例
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
//Protocol可不设置,不设置默认使用PROTOBUF传输协议
new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();
配置描述
DatahubConfig
名称
描述
endpoint
DataHub服务地址。
account
阿里云账号信息。
protocol
传输协议,目前支持 PROTOBUF 和 BATCH。
说明使用BATCH传输协议需要开启多version,您可在控制台或者SDK中开启,具体实现方式请参照:DataHub成本节省攻略。
HttpConfig
名称
描述
readTimeout
Socket读写超时时间,默认10s。
connTimeout
TCP连接超时时间,默认10s。
maxRetryCount
请求失败重试,默认1,不建议修改,重试由上层业务层处理。
debugRequest
是否打印请求日志信息,默认false。
compressType
数据传输压缩方式,默认lz4压缩,支持lz4, deflate,ztsd压缩。
proxyUri
代理服务器主机地址。
proxyUsername
代理服务器验证的用户名。
proxyPassword
代理服务器验证的密码。
SDK统计信息
SDK支持针对put/get等请求进行QPS等统计,开启方式:
ClientMetrics.startMetrics();
metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为:
com.aliyun.datahub.client.metrics
。
写入数据到DataHub
以Tuple类型Topic为例。
public void writeTupleTopic(int maxRetry) {
String shardId = "9";
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性
recordEntry.addAttribute("key1", "value11");
TupleRecordData data = new TupleRecordData(this.recordSchema);
data.setField("field1", "Hello World");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
int retryNum = 0;
while (retryNum < maxRetry) {
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
Constant.topicName, recordEntries);
System.out.println("write tuple data successful");
System.out.println(putRecordsResult.getPutErrorEntries());
break;
} catch (InvalidParameterException e) {
// invalid parameter
e.printStackTrace();
throw e;
} catch (AuthorizationFailureException e) {
// AK error
e.printStackTrace();
throw e;
} catch (ResourceNotFoundException e) {
// project or topic not found
e.printStackTrace();
throw e;
} catch (ShardSealedException e) {
// shard status is CLOSED, read only
e.printStackTrace();
throw e;
} catch (LimitExceededException e) {
// limit exceed, retry
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
retryNum++;
}
}
}
创建订阅消费DataHub数据
//点位消费示例,并在消费过程中进行点位的提交
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = null;
//sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取10条record
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 处理数据完成后,设置点位
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
//提交点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
break;
} catch (SubscriptionOffsetResetException e) {
// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: 针对不同异常决定是否退出
} catch (Exception e) {
break;
}
}
}
异常类型
Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。
其中异常类型中,除DatahubClientException和LimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientException和LimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。
以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception
。
异常类名 | 错误码 | 异常说明 |
InvalidParameterException |
| 非法参数。 |
ResourceNotFoundException |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
ResourceAlreadyExistException |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常)。 |
SeekOutOfRangeException |
| getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
NoPermissionException |
| 没有权限,通常是RAM配置不正确,或没有正确授权子账号。 |
ShardSealedException |
| shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常。 |
LimitExceededException |
| 接口使用超限,参考限制描述。 |
SubscriptionOfflineException |
| 订阅处于下线状态不可用。 |
SubscriptionSessionInvalidException |
| 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。 |
SubscriptionOffsetResetException |
| 订阅点位被重置。 |
MalformedRecordException |
| 非法的 Record 格式,可能的情况有:schema 不正确、包含非UTF-8字符、客户端使用pb而服务端不支持等等。 |
DatahubClientException | 其他所有,并且是所有异常的基类 | 如排除以上异常情况,通常重试即可,但应限制重试次数。 |