本文为您展示DataHub的 Java SDK的offset操作。
初始化offset
openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
subId | String | The id of the subscription. |
shardIds | List | The id list of the shards. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void openSubscriptionSession(String projectName, String topicName) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getSessionId() + "\t"
+ subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
获取offset
getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
subId | String | The id of the subscription. |
shardIds | List | The id list of the shards. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
//获取点位
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
提交offset
提交点位会验证versionId和sessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的sequence和timestamp来填写。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
subId | String | The id of the subscription. |
offsets | Map | The offset map of shards. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
SubscriptionOffsetResetException |
| 订阅点位被重置。 |
SubscriptionSessionInvalidException |
| 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。 |
SubscriptionOfflineException |
| 订阅处于下线状态不可用。 |
代码示例
//提交点位
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
while (true) {
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
//这里仅仅测试提交,完整过程请参考点位消费样例
subscriptionOffset.setSequence(10);
subscriptionOffset.setTimestamp(100);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
offsets.put(shardId, subscriptionOffset);
// 提交点位
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
重置offset
重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以获取到新的versionId。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
subId | String | The id of the subscription. |
offsets | Map | The offset map of shards. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
//重置点位
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
List<String> shardIds = Arrays.asList("0");
//选择想要重置点位到的时间,并转换为时间戳
String time = "2019-07-09 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long timestamp = date.getTime();//获取时间的时间戳
long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
SubscriptionOffset offset = new SubscriptionOffset();
offset.setTimestamp(timestamp);
offset.setSequence(sequence);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
for (String shardId : shardIds) {
offsets.put(shardId, offset);
}
try {
datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
System.out.println("reset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
关联订阅消费DataHub数据
同读取DataHub数据类似,使用订阅进行消费的不同之处在于订阅存储了消费的点位,用户可自由选择消费点位。
首先调用openSubscriptionSession初始化offset,获取version + session信息,全局只初始化一次,多次调用此方法,会造成原有session失效,无法提交点位。
调用getCursor获取订阅的点位进行消费,消费完一条数据后,调用getNextCursor获取下一条数据点位,继续消费。
提交点位时,调用commitSubscriptionOffset提交点位,commit操作会检查version和session信息,必须完全一致才能提交成功。
//点位消费示例,并在消费过程中进行点位的提交
public static void example(String projectName, String topicName,String subId) {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = null;
//sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取10条record
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 处理数据完成后,设置点位
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
点位//提交点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
// 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
break;
} catch (OffsetResetedException e) {
// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: 针对不同异常决定是否退出
} catch (Exception e) {
break;
}
}
}