全部产品
云市场

订阅功能介绍

更新时间:2019-10-16 15:40:07

订阅功能使用介绍

目前用户在DataHub上消费Topic数据,为了做到“断点续消费”的功能,即消费者failover重启后可以继续从failover时间点继续获取数据,需要用户自己保存当前消费的点位信息,同时用户还需要关心自己的点位存储服务的高可用,这无疑增加了用户开发应用程序的复杂度。基于此,DataHub新上线的订阅服务提供了服务端保存用户消费点位的功能,用户只需要通过简单的几步配置,然后在自己的应用逻辑里添加几行简单的处理逻辑,就可以拥有一个对自己“透明”的高可用的点位存储服务。

另外,订阅服务还提供了灵活的点位重置功能,从而保证用了“At Least Once”的消费语义,比如用户发现自己应用程序有个时间段消费的数据处理上存在问题,想重新消费,此时只需要将点位重置到对应的时间点,并且无须重启自己的应用程序,可以做到应用程序自动感知。

那我们如何使用新的订阅功能呢?非常简单,只需要按照下面几个步骤即可。

1. 创建订阅

创建订阅目前只能通过Webconsole创建,另外需要确保自己账号有权限对特定project下的topic有订阅权限,具体授权参见访问控制说明文档。创建步骤如下:

  • 打开topic页面,点击右上角+Subscription按钮

create_subscription1.png

  • 填写订阅详情,点击创建

    • 应用名称:用来说明当前订阅的应用名称
    • 接口人:应用接口人
    • 接口人联系方式:email或联系电话
    • 订阅描述:当前订阅的详情描述

create_subscription2.png

订阅详情信息的各项字段会被用来索引,在订阅搜索页面可以根据关键字进行匹配,方便后面订阅查询,尤其在某些Topic订阅量很多的时候这个功能非常有用。

search_subscription.png

完成订阅创建后,可以在Topic页面,切到SubscriptionTab上,可以看到当前Topic下的所有订阅列表

create_subscription3.png

此时,你会发现在每条订阅记录的左侧操作列下都会有查看更新重置点位上/下线删除几个功能键,这里作下说明:

  1. 查看:查看订阅详情信息
  2. 更新:更新订阅详情信息
  3. 重置点位:将当前订阅的消费点位重置到自己想要开始消费的时间点,格式为yyyy-mm-DD HH:MM:SS,这里是重置会将所有shard的消费点位统一设置,如果需要单独针对某个shard设置点位,则可以在订阅详情页进行按shard级别进行设置
  4. 上/下线:订阅会有上线和下线两个状态,只有处于上线状态的订阅才可以使用
  5. 删除:将订阅永久删除,包括之前存储的点位都会从物理上删除

比如我们可以点击查看刚刚创建的订阅,订阅ID为1511863813739LPaZY

detail_subscription.png

在该页面可以详细查看当前订阅下的各个shard消费的点位,而且还可以按shard级别重置点位,目前该订阅还没进行过任何消费行为,所以所有shard的当前消费点位都是未消费

2. 使用示例

订阅功能为用户提供了点位存储的能力,与DataHub读写功能(参见JavaSDK说明文档)并无必然联系,不过二者可以配合使用,即数据读取后用户需要将消费点位进行存储的场景。

  • 参考代码
  1. //点位消费示例,并在消费过程中进行点位的提交
  2. public void offset_consumption(int maxRetry) {
  3. String endpoint = "<YourEndPoint>";
  4. String accessId = "<YourAccessId>";
  5. String accessKey = "<YourAccessKey>";
  6. String projectName = "<YourProjectName>";
  7. String topicName = "<YourTopicName>";
  8. String subId = "<YourSubId>";
  9. String shardId = "0";
  10. List<String> shardIds = Arrays.asList(shardId);
  11. // 创建DataHubClient实例
  12. DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
  13. .setDatahubConfig(
  14. new DatahubConfig(endpoint,
  15. // 是否开启二进制传输,服务端2.12版本开始支持
  16. new AliyunAccount(accessId, accessKey), true))
  17. .build();
  18. RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
  19. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
  20. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  21. // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  22. String cursor = "";
  23. //sequence < 0说明未消费
  24. if (subscriptionOffset.getSequence() < 0) {
  25. // 获取生命周期内第一条record的cursor
  26. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  27. } else {
  28. // 获取下一条记录的Cursor
  29. long nextSequence = subscriptionOffset.getSequence() + 1;
  30. try {
  31. //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
  32. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  33. } catch (SeekOutOfRangeException e) {
  34. // 获取生命周期内第一条record的cursor
  35. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  36. }
  37. }
  38. // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  39. long recordCount = 0L;
  40. // 每次读取1000条record
  41. int fetchNum = 1000;
  42. int retryNum = 0;
  43. int commitNum = 1000;
  44. while (retryNum < maxRetry) {
  45. try {
  46. GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
  47. if (getRecordsResult.getRecordCount() <= 0) {
  48. // 无数据,sleep后读取
  49. System.out.println("no data, sleep 1 second");
  50. Thread.sleep(1000);
  51. continue;
  52. }
  53. for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
  54. //消费数据
  55. TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
  56. System.out.println("field1:" + data.getField("field1") + "\t"
  57. + "field2:" + data.getField("field2"));
  58. // 处理数据完成后,设置点位
  59. recordCount++;
  60. subscriptionOffset.setSequence(recordEntry.getSequence());
  61. subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
  62. // commit offset every 1000 records
  63. if (recordCount % commitNum == 0) {
  64. //提交点位点位
  65. Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
  66. offsetMap.put(shardId, subscriptionOffset);
  67. datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
  68. System.out.println("commit offset successful");
  69. }
  70. }
  71. cursor = getRecordsResult.getNextCursor();
  72. } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
  73. // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
  74. e.printStackTrace();
  75. throw e;
  76. } catch (SubscriptionOffsetResetException e) {
  77. // 点位被重置,需要重新获取SubscriptionOffset版本信息
  78. SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
  79. subscriptionOffset.setVersionId(offset.getVersionId());
  80. // 点位被重置之后,需要重新获取点位,获取点位的方法应该与重置点位时一致,
  81. // 如果重置点位时,同时设置了sequence和timestamp,那么既可以用SEQUENCE获取,也可以用SYSTEM_TIME获取
  82. // 如果重置点位时,只设置了sequence,那么只能用sequence获取,
  83. // 如果重置点位时,只设置了timestamp,那么只能用SYSTEM_TIME获取点位
  84. // 一般情况下,优先使用SEQUENCE,其次是SYSTEM_TIME,如果都失败,则采用OLDEST获取
  85. cursor = null;
  86. if (cursor == null) {
  87. try {
  88. long nextSequence = offset.getSequence() + 1;
  89. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  90. System.out.println("get cursor successful");
  91. } catch (DatahubClientException exception) {
  92. System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
  93. }
  94. }
  95. if (cursor == null) {
  96. try {
  97. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
  98. System.out.println("get cursor successful");
  99. } catch (DatahubClientException exception) {
  100. System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
  101. }
  102. }
  103. if (cursor == null) {
  104. try {
  105. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  106. System.out.println("get cursor successful");
  107. } catch (DatahubClientException exception) {
  108. System.out.println("get cursor by OLDEST failed");
  109. System.out.println("get cursor failed!!");
  110. throw e;
  111. }
  112. }
  113. } catch (LimitExceededException e) {
  114. // limit exceed, retry
  115. e.printStackTrace();
  116. retryNum++;
  117. } catch (DatahubClientException e) {
  118. // other error, retry
  119. e.printStackTrace();
  120. retryNum++;
  121. } catch (Exception e) {
  122. e.printStackTrace();
  123. System.exit(-1);
  124. }
  125. }
  126. }
  • 运行结果

    1、第一次启动时会从最早的数据开始消费,运行过程中可以刷新webconsole上的订阅页面,shard 0的消费点位都在往前移动。

    subscription_result1.png

    2、如果在消费过程中,通过webconsole上的重置点位功能来手动调整点位的话,我们的消费程序会自动感知到点位变化从新调整后的点位开始消费,这是客户端通过捕获OffsetResetedException异常后调用getSubscriptionOffset接口从服务端获取到最新的SubscriptionOffset对象,然后继续从新点位开始消费。

    3、注意:同一个订阅下的相同shard不要同时被多个消费线程/进程消费,否则点位会被交替覆盖,也就是最终服务端存储的点位是未定义的,这种情况下服务端会抛OffsetSessionChangedException异常,建议客户端对这类异常进行捕获后做exit处理,检查是否存在重复消费的设计。