offset 操作

本文为您展示DataHub的 Java SDKoffset操作。

初始化offset

openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

subId

String

The id of the subscription.

shardIds

List

The id list of the shards.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void openSubscriptionSession(String projectName, String topicName) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getSessionId() + "\t"
                + subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}

获取offset

getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

subId

String

The id of the subscription.

shardIds

List

The id list of the shards.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

//获取点位
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

提交offset

提交点位会验证versionIdsessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的sequencetimestamp来填写。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

subId

String

The id of the subscription.

offsets

Map

The offset map of shards.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

SubscriptionOffsetResetException

OffsetReseted

订阅点位被重置。

SubscriptionSessionInvalidException

OffsetSessionChanged

OffsetSessionClosed

订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。

SubscriptionOfflineException

SubscriptionOffline

订阅处于下线状态不可用。

代码示例

//提交点位
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
  while (true) {
      try {
          OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
          SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
          //这里仅仅测试提交,完整过程请参考点位消费样例
          subscriptionOffset.setSequence(10);
          subscriptionOffset.setTimestamp(100);
          Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
          offsets.put(shardId, subscriptionOffset);
          // 提交点位
          datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());


      }
  }
}

重置offset

重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以获取到新的versionId。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

subId

String

The id of the subscription.

offsets

Map

The offset map of shards.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

//重置点位
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
    List<String> shardIds = Arrays.asList("0");
    //选择想要重置点位到的时间,并转换为时间戳
    String time = "2019-07-09 10:00:00";
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date date = simpleDateFormat.parse(time);

    long timestamp = date.getTime();//获取时间的时间戳
    long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
    SubscriptionOffset offset = new SubscriptionOffset();
    offset.setTimestamp(timestamp);
    offset.setSequence(sequence);
    Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
    for (String shardId : shardIds) {
        offsets.put(shardId, offset);
    }

    try {
        datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
        System.out.println("reset successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

关联订阅消费DataHub数据

同读取DataHub数据类似,使用订阅进行消费的不同之处在于订阅存储了消费的点位,用户可自由选择消费点位。

说明
  • 首先调用openSubscriptionSession初始化offset,获取version + session信息,全局只初始化一次,多次调用此方法,会造成原有session失效,无法提交点位。

  • 调用getCursor获取订阅的点位进行消费,消费完一条数据后,调用getNextCursor获取下一条数据点位,继续消费。

  • 提交点位时,调用commitSubscriptionOffset提交点位,commit操作会检查versionsession信息,必须完全一致才能提交成功。

//点位消费示例,并在消费过程中进行点位的提交
public static void example(String projectName, String topicName,String subId) {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  String cursor = null;
  //sequence < 0说明未消费
  if (subscriptionOffset.getSequence() < 0) {
      // 获取生命周期内第一条record的cursor
      cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // 获取下一条记录的Cursor
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // 获取生命周期内第一条record的cursor
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  long recordCount = 0L;
  // 每次读取10条record
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // 无数据,sleep后读取
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              //消费数据
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // 处理数据完成后,设置点位
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
    点位//提交点位
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
          // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
          break;
      } catch (OffsetResetedException e) {
          // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
          // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
          subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: 针对不同异常决定是否退出
      } catch (Exception e) {
          break;
      }
  }
}