SDK实践指南

本文为您介绍使用JavaSDK时应注意的内容以及异常参照。

初始化

用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessIdAccessKey,同时需要提供访问DataHub的服务地址。以下代码用于使用DataHub域名列表新建DataHubClient:

sdk 2.25.1及以上版本(推荐)

//使用新的Batch传输协议创建DataHubClient实例
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
  		         //Protocol可不设置,不设置默认使用PROTOBUF传输协议
                new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
        ).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();
  • 配置描述

    • DatahubConfig

      名称

      描述

      endpoint

      DataHub服务地址。

      account

      阿里云账号信息。

      protocol

      传输协议,目前支持 PROTOBUF 和 BATCH

      说明

      使用BATCH传输协议需要开启多version,您可在控制台或者SDK中开启,具体实现方式请参照:DataHub成本节省攻略

    • HttpConfig

      名称

      描述

      readTimeout

      Socket读写超时时间,默认10s。

      connTimeout

      TCP连接超时时间,默认10s。

      maxRetryCount

      请求失败重试,默认1,不建议修改,重试由上层业务层处理。

      debugRequest

      是否打印请求日志信息,默认false。

      compressType

      数据传输压缩方式,默认lz4压缩,支持lz4, deflate,ztsd压缩。

      proxyUri

      代理服务器主机地址。

      proxyUsername

      代理服务器验证的用户名。

      proxyPassword

      代理服务器验证的密码。

  • SDK统计信息

    SDK支持针对put/get等请求进行QPS等统计,开启方式:

    ClientMetrics.startMetrics();

    metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为:com.aliyun.datahub.client.metrics

写入数据到DataHub

Tuple类型Topic为例。

  public void writeTupleTopic(int maxRetry) {
    String shardId = "9";
    // 生成十条数据
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
      RecordEntry recordEntry = new RecordEntry();
      // 对每条数据设置额外属性
      recordEntry.addAttribute("key1", "value11");
      TupleRecordData data = new TupleRecordData(this.recordSchema);
      data.setField("field1", "Hello World");
      data.setField("field2", 1234567);
      recordEntry.setRecordData(data);
      recordEntry.setShardId(shardId);
      recordEntries.add(recordEntry);
    }

    int retryNum = 0;
    while (retryNum < maxRetry) {
      try {
        // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
            Constant.topicName, recordEntries);
        System.out.println("write tuple data successful");
        System.out.println(putRecordsResult.getPutErrorEntries());
        break;
      } catch (InvalidParameterException e) {
        // invalid parameter
        e.printStackTrace();
        throw e;
      } catch (AuthorizationFailureException e) {
        // AK error
        e.printStackTrace();
        throw e;
      } catch (ResourceNotFoundException e) {
        // project or topic not found
        e.printStackTrace();
        throw e;
      } catch (ShardSealedException e) {
        // shard status is CLOSED, read only
        e.printStackTrace();
        throw e;
      } catch (LimitExceededException e) {
        // limit exceed, retry
        e.printStackTrace();
        retryNum++;
      } catch (DatahubClientException e) {
        // other error
        e.printStackTrace();
        retryNum++;
      }
    }
  }

创建订阅消费DataHub数据

//点位消费示例,并在消费过程中进行点位的提交
public static void example() {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  String cursor = null;
  //sequence < 0说明未消费
  if (subscriptionOffset.getSequence() < 0) {
      // 获取生命周期内第一条record的cursor
      cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // 获取下一条记录的Cursor
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // 获取生命周期内第一条record的cursor
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  long recordCount = 0L;
  // 每次读取10条record
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // 无数据,sleep后读取
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              //消费数据
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // 处理数据完成后,设置点位
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
        //提交点位
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
          // 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
          break;
      } catch (SubscriptionOffsetResetException e) {
          // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
          // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
          subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: 针对不同异常决定是否退出
      } catch (Exception e) {
          break;
      }
  }
}

异常类型

Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。

其中异常类型中,除DatahubClientExceptionLimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientExceptionLimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。

以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception

异常类名

错误码

异常说明

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

ResourceAlreadyExistException

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常)。

SeekOutOfRangeException

SeekOutOfRange

getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

NoPermissionException

NoPermission

OperationDenied

没有权限,通常是RAM配置不正确,或没有正确授权子账号。

ShardSealedException

InvalidShardOperation

shard 处于CLOSED状态可读不可写,继续往CLOSEDshard 写数据,或读到最后一条数据后继续读取,会抛出该异常。

LimitExceededException

LimitExceeded

接口使用超限,参考限制描述

SubscriptionOfflineException

SubscriptionOffline

订阅处于下线状态不可用。

SubscriptionSessionInvalidException

OffsetSessionChanged

OffsetSessionClosed

订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。

SubscriptionOffsetResetException

OffsetReseted

订阅点位被重置。

MalformedRecordException

MalformedRecord

ShardNotReady

非法的 Record 格式,可能的情况有:schema 不正确、包含非UTF-8字符、客户端使用pb而服务端不支持等等。

DatahubClientException

其他所有,并且是所有异常的基类

如排除以上异常情况,通常重试即可,但应限制重试次数。