订阅功能使用介绍
目前用户在DataHub上消费Topic数据,为了做到“断点续消费”的功能,即消费者failover重启后可以继续从failover时间点继续获取数据,需要用户自己保存当前消费的点位信息,同时用户还需要关心自己的点位存储服务的高可用,这无疑增加了用户开发应用程序的复杂度。基于此,DataHub新上线的订阅服务提供了服务端保存用户消费点位的功能,用户只需要通过简单的几步配置,然后在自己的应用逻辑里添加几行简单的处理逻辑,就可以拥有一个对自己“透明”的高可用的点位存储服务。另外,订阅服务还提供了灵活的点位重置功能,从而保证用了“At Least Once”的消费语义,比如用户发现自己应用程序有个时间段消费的数据处理上存在问题,想重新消费,此时只需要将点位重置到对应的时间点,并且无须重启自己的应用程序,可以做到应用程序自动感知。
创建订阅
需要确保自己账号有权限对特定project下的topic有订阅权限,具体授权参见权限控制。创建步骤如下:
打开Topic页面,点击右上角。
填写订阅详情,点击创建。
订阅应用:用来说明当前订阅的应用名称。
描述:当前订阅的详情描述。
单击消费点位下方搜索按钮,即可查看所有shard消费情况。
示例代码
订阅功能为用户提供了点位存储的能力,与DataHub读写功能(参见Java SDK)并无必然联系,不过二者可以配合使用,即数据读取后用户需要将消费点位进行存储的场景。
前置条件
您需要在工程中配置相应的Access Key和Secret Key,推荐使用环境变量的形式在配置文件中配置,详情可参见:管理访问凭据。
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
参考代码
//点位消费示例,并在消费过程中进行点位的提交
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public void offset_consumption(int maxRetry) {
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubId>";
String shardId = "0";
List<String> shardIds = Arrays.asList(shardId);
// 创建DataHubClient实例
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// 是否开启二进制传输,服务端2.12版本开始支持
new AliyunAccount(accessId, accessKey), true))
.build();
RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = "";
//sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取1000条record
int fetchNum = 1000;
int retryNum = 0;
int commitNum = 1000;
while (retryNum < maxRetry) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
System.out.println("no data, sleep 1 second");
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 处理数据完成后,设置点位
recordCount++;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
// commit offset every 1000 records
if (recordCount % commitNum == 0) {
//提交点位的点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
e.printStackTrace();
throw e;
} catch (SubscriptionOffsetResetException e) {
// 点位被重置,需要重新获取SubscriptionOffset版本信息
SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
subscriptionOffset.setVersionId(offset.getVersionId());
// 点位被重置之后,需要重新获取点位,获取点位的方法应该与重置点位时一致,
// 如果重置点位时,同时设置了sequence和timestamp,那么既可以用SEQUENCE获取,也可以用SYSTEM_TIME获取
// 如果重置点位时,只设置了sequence,那么只能用sequence获取,
// 如果重置点位时,只设置了timestamp,那么只能用SYSTEM_TIME获取点位
// 一般情况下,优先使用SEQUENCE,其次是SYSTEM_TIME,如果都失败,则采用OLDEST获取
cursor = null;
if (cursor == null) {
try {
long nextSequence = offset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by OLDEST failed");
System.out.println("get cursor failed!!");
throw e;
}
}
} catch (LimitExceededException e) {
// limit exceed, retry
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// other error, retry
e.printStackTrace();
retryNum++;
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
}
查看运行结果
第一次启动时会从最早的数据开始消费,运行过程中可以刷新webconsole上的订阅页面,shard 的消费点位都在往前移动。
如果在消费过程中,通过webconsole上的重置点位功能来手动调整点位的话,我们的消费程序会自动感知到点位变化从新调整后的点位开始消费,这时客户端通过捕获
OffsetResetedException
异常后调用getSubscriptionOffset
接口从服务端获取到最新的SubscriptionOffset
对象,然后继续从新点位开始消费。
同一个订阅下的相同shard不要同时被多个消费线程/进程消费,否则点位会被交替覆盖,也就是最终服务端存储的点位是未定义的,这种情况下服务端会抛OffsetSessionChangedException
异常,建议客户端对这类异常进行捕获后做exit处理,检查是否存在重复消费的设计。