全部产品
云市场

Java SDK介绍

更新时间:2019-09-23 18:41:33

一. Maven依赖以及JDK:

1. Maven Pom

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.13.0-public</version>
  5. </dependency>

2. JDK

jdk: >= 1.8

二. 常见问题

  1. 接口调用

    正常情况下,SDK只有putRecords / putRecordsByShardgetRecords接口是需要进行频繁调用进行数据读写的,其他的接口比如getTopicgetCursorlistShard等接口一般只有初始化时需要调用。

  2. Client初始化

    在项目工程中,可以有一个或者多个DatahubClient实例,DatahubClient实例可以并发使用。

  3. 不同包中的同名类

    遇到相同类名不同包路径的情况,2.12版本使用的均为com.aliyun.datahub.client包中的类,其他包中的类是为了兼容版本低于2.12的使用方式。例如:

    1. com.aliyun.datahub.client.model.RecordSchema (2.12版本)
    2. com.aliyun.datahub.common.data.RecordSchema (使用2.12之前版本sdk编写的用户代码,在升级sdk后不修改代码,则继续使用此类型)
  4. 出现错误 ‘Parse body failed, Offset: 0’,尝试将enableBinary参数设置为false

三. SDK示例

初始化

用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。

以下代码用于使用DataHub域名新建DataHubClient:

  1. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  2. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  3. String accessId = "<YourAccessKeyId>";
  4. String accessKey = "<YourAccessKeySecret>";
  5. // 创建DataHubClient实例
  6. DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
  7. .setDatahubConfig(
  8. new DatahubConfig(endpoint,
  9. // 是否开启二进制传输,服务端2.12版本开始支持
  10. new AliyunAccount(accessId, accessKey), true))
  11. //专有云使用出错尝试将参数设置为 false
  12. // HttpConfig可不设置,不设置时采用默认值
  13. .setHttpConfig(new HttpConfig().setConnTimeout(10000))
  14. .build();

配置描述:

1). DatahubConfig

名称 描述
endpoint DataHub服务地址
account 阿里云账号信息
enableBinary 是否采用二进制传输,服务端从2.12版本开始支持,之前版本需设置为false,专有云使用时出现错误’Parse body failed, Offset: 0’,尝试设置为false

2). HttpConfig

名称 描述
readTimeout Socket读写超时时间,默认10s
connTimeout TCP连接超时时间,默认10s
maxRetryCount 请求失败重试,默认1,不建议修改,重试由上层业务层处理
debugRequest 是否打印请求日志信息,默认false
compressType 数据传输压缩方式,默认不压缩,支持lz4, deflate压缩
proxyUri 代理服务器主机地址
proxyUsername 代理服务器验证的用户名
proxyPassword 代理服务器验证的密码

3). SDK统计信息

SDK支持针对put/get等请求进行QPS等统计,开启方式:

  1. ClientMetrics.startMetrics();

metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为: com.aliyun.datahub.client.metrics

异常类型

Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。其中异常类型中,除DatahubClientException和LimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientException和LimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。

以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception

类名 错误码 描述
InvalidParameterException InvalidParameter, InvalidCursor 非法参数
ResourceNotFoundException ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )
ResourceAlreadyExistException ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist 资源已存在(创建时如果资源已存在,就会抛出这个异常
SeekOutOfRangeException SeekOutOfRange getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间
AuthorizationFailureException Unauthorized Authorization 签名解析异常,检查AK是否填写正确
NoPermissionException NoPermission, OperationDenied 没有权限,通常是RAM配置不正确,或没有正确授权子账号
ShardSealedException InvalidShardOperation shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常
LimitExceededException LimitExceeded 接口使用超限,参考 限制描述
SubscriptionOfflineException SubscriptionOffline 订阅处于下线状态不可用
SubscriptionSessionInvalidException OffsetSessionChanged, OffsetSessionClosed 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常
SubscriptionOffsetResetException OffsetReseted 订阅点位被重置
MalformedRecordException MalformedRecord 非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持、等等
DatahubClientException 其他所有,并且是所有异常的基类 如排除以上异常情况,通常重试即可,但应限制重试次数
  • 创建project示例

    1. public static void createProject() {
    2. try {
    3. datahubClient.createProject(Constant.projectName, Constant.projectComment);
    4. System.out.println("create project successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter");
    7. } catch (AuthorizationFailureException e) {
    8. System.out.println("AK error, please check your accessId and accessKey");
    9. } catch (ResourceAlreadyExistException e) {
    10. System.out.println("project already exists");
    11. } catch (DatahubClientException e) {
    12. System.out.println("other error");
    13. }
    14. }
  • 点位消费示例

    1. //点位消费示例,并在消费过程中进行点位的提交
    2. public static void example() {
    3. String shardId = "0";
    4. List<String> shardIds = Arrays.asList("0", "1");
    5. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
    6. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
    7. // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
    8. String cursor = null;
    9. //sequence < 0说明未消费
    10. if (subscriptionOffset.getSequence() < 0) {
    11. // 获取生命周期内第一条record的cursor
    12. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
    13. } else {
    14. // 获取下一条记录的Cursor
    15. long nextSequence = subscriptionOffset.getSequence() + 1;
    16. try {
    17. //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
    18. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
    19. } catch (SeekOutOfRangeException e) {
    20. // 获取生命周期内第一条record的cursor
    21. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
    22. }
    23. }
    24. // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
    25. long recordCount = 0L;
    26. // 每次读取10条record
    27. int fetchNum = 10;
    28. while (true) {
    29. try {
    30. GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
    31. if (getRecordsResult.getRecordCount() <= 0) {
    32. // 无数据,sleep后读取
    33. Thread.sleep(1000);
    34. continue;
    35. }
    36. for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
    37. //消费数据
    38. TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
    39. System.out.println("field1:" + data.getField("field1") + "\t"
    40. + "field2:" + data.getField("field2"));
    41. // 处理数据完成后,设置点位
    42. ++recordCount;
    43. subscriptionOffset.setSequence(recordEntry.getSequence());
    44. subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
    45. if (recordCount % 1000 == 0) {
    46. //提交点位点位
    47. Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
    48. offsetMap.put(shardId, subscriptionOffset);
    49. datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
    50. System.out.println("commit offset successful");
    51. }
    52. }
    53. cursor = getRecordsResult.getNextCursor();
    54. } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
    55. // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
    56. break;
    57. } catch (OffsetResetedException e) {
    58. // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
    59. // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
    60. subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
    61. long nextSequence = subscriptionOffset.getSequence() + 1;
    62. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
    63. } catch (DatahubClientException e) {
    64. // TODO: 针对不同异常决定是否退出
    65. } catch (Exception e) {
    66. break;
    67. }
    68. }
    69. }

Project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建 Project

CreateProjectResult createProject(String projectName, String comment);

创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

  • 参数

    • projectName project name
    • comment project comment
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceAlreadyExistException
  • 示例

    1. public static void createProject() {
    2. try {
    3. datahubClient.createProject(Constant.projectName, Constant.projectComment);
    4. System.out.println("create project successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceAlreadyExistException e) {
    12. System.out.println("project already exists, create project successful");
    13. } catch (DatahubClientException e) {
    14. System.out.println("other error");
    15. System.exit(1);
    16. }
    17. }

删除 Project

DeleteProjectResult deleteProject(String projectName);删除Project时必须保证Project中已经没有Topic。

  • 参数
    • projectName project name
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • NoPermissionException, project内仍有topic时则会抛出的异常
    • ResourceNotFoundException
  • 示例

    1. public static void deleteProject() {
    2. try {
    3. datahubClient.deleteProject(Constant.projectName);
    4. System.out.println("delete project successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceNotFoundException e) {
    12. System.out.println("project not found, delete successful");
    13. } catch (NoPermissionException e) {
    14. System.out.println("project has topic, can not delete, please delete topic first");
    15. System.exit(1);
    16. } catch (DatahubClientException e) {
    17. System.out.println("other error");
    18. System.exit(1);
    19. }
    20. }

更新 Project

UpdateProjectResult updateProject(String projectName, String comment); 更新project信息,目前只支持更新comment。

  • 参数
    • projectName project name
    • comment project comment
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void updateProject() {
  2. try {
  3. String newComment = "new project comment";
  4. datahubClient.updateProject(Constant.projectName, newComment);
  5. System.out.println("update project successful");
  6. } catch (InvalidParameterException e) {
  7. System.out.println("invalid parameter, please check your parameter");
  8. System.exit(1);
  9. } catch (AuthorizationFailureException e) {
  10. System.out.println("AK error, please check your accessId and accessKey");
  11. System.exit(1);
  12. } catch (ResourceNotFoundException e) {
  13. System.out.println("project not found");
  14. System.exit(1);
  15. } catch (DatahubClientException e) {
  16. System.out.println("other error");
  17. System.exit(1);
  18. }
  19. }

列出 Project

ListProjectResult listProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。

  • 参数

  • Exception

    • DatahubClientException
    • AuthorizationFailureException
  • 示例

  1. public static void listProject() {
  2. try {
  3. ListProjectResult listProjectResult = datahubClient.listProject();
  4. if (listProjectResult.getProjectNames().size() > 0) {
  5. for (String pName : listProjectResult.getProjectNames()) {
  6. System.out.println(pName);
  7. }
  8. }
  9. } catch (AuthorizationFailureException e) {
  10. System.out.println("AK error, please check your accessId and accessKey");
  11. System.exit(1);
  12. } catch (DatahubClientException e) {
  13. System.out.println("other error");
  14. System.exit(1);
  15. }
  16. }

查询 Project

GetProjectResult getProject(String projectName);

  • 参数
    • projectName project name
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void getProject() {
  2. try {
  3. GetProjectResult getProjectResult = datahubClient.getProject(Constant.projectName + "123");
  4. System.out.println(getProjectResult.getCreateTime() + "\t"
  5. + getProjectResult.getLastModifyTime() + "\t"
  6. + getProjectResult.getComment());
  7. } catch (InvalidParameterException e) {
  8. System.out.println("invalid parameter, please check your parameter");
  9. System.exit(1);
  10. } catch (AuthorizationFailureException e) {
  11. System.out.println("AK error, please check your accessId and accessKey");
  12. System.exit(1);
  13. } catch (ResourceNotFoundException e) {
  14. System.out.println("project not found, get project failed");
  15. } catch (DatahubClientException e) {
  16. System.out.println("other error");
  17. System.exit(1);
  18. }
  19. }

Topic 操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record
  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型 含义 值域
BIGINT 8字节有符号整型 -9223372036854775807 ~ 9223372036854775807
DOUBLE 8字节双精度浮点数 -1.0 10^308 ~ 1.0 10^308
BOOLEAN 布尔类型 True/False或true/false或0/1
TIMESTAMP 时间戳类型 表示到微秒的时间戳类型
STRING 字符串,只支持UTF-8编码 单个STRING列最长允许1MB

创建 Tuple Topic

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment);

  • 参数

    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • recordSchema The records schema of this topic.
    • comment The comment of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ResourceAlreadyExistException
  • 示例

    1. public static void createTupleTopic() {
    2. RecordSchema recordSchema = new RecordSchema();
    3. schema.addField(new Field("bigint_field", FieldType.BIGINT));
    4. schema.addField(new Field("double_field", FieldType.DOUBLE));
    5. schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
    6. schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
    7. schema.addField(new Field("string_field", FieldType.STRING));
    8. try {
    9. datahubClient.createTopic(Constant.projectName, Constant.topicName, Constant.shardCount, Constant.lifeCycle, RecordType.TUPLE, recordSchema, Constant.topicComment);
    10. System.out.println("create topic successful");
    11. } catch (InvalidParameterException e) {
    12. System.out.println("invalid parameter, please check your parameter");
    13. System.exit(1);
    14. } catch (AuthorizationFailureException e) {
    15. System.out.println("AK error, please check your accessId and accessKey");
    16. System.exit(1);
    17. } catch (ResourceNotFoundException e) {
    18. System.out.println(" project not found,please create project first");
    19. System.exit(1);
    20. } catch (ResourceAlreadyExistException e) {
    21. System.out.println("topic already exists,create topic successful");
    22. } catch (DatahubClientException e) {
    23. System.out.println("other error");
    24. System.exit(1);
    25. }
    26. }

创建 Blob Topic

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment);

  • 参数

    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • comment The comment of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ResourceAlreadyExistException
  • 示例

    1. public static void createBlobTopic() {
    2. try {
    3. datahubClient.createTopic(Constant.projectName, Constant.blobTopicName, Constant.shardCount, Constant.lifeCycle, RecordType.BLOB, Constant.topicComment);
    4. System.out.println("create topic successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceNotFoundException e) {
    12. System.out.println(" project not found,please create project first");
    13. System.exit(1);
    14. } catch (ResourceAlreadyExistException e) {
    15. System.out.println("topic already exists,create topic successful");
    16. } catch (DatahubClientException e) {
    17. System.out.println("other error");
    18. System.exit(1);
    19. }
    20. }

删除 Topic

删除topic之前要保证topic中没有subscription和connector

DeleteTopicResult deleteTopic(String projectName, String topicName);

  • 参数
    • projectName The name of the project in which you delete.
    • topicName The name of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

    1. public static void deleteTopic() {
    2. try {
    3. datahubClient.deleteTopic(Constant.projectName, Constant.topicName);
    4. System.out.println("delete topic successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceNotFoundException e) {
    12. System.out.println("project or topic not found");
    13. System.exit(1);
    14. } catch (ResourceAlreadyExistException e) {
    15. System.out.println("topic already exists,create topic successful");
    16. } catch (DatahubClientException e) {
    17. System.out.println("other error");
    18. System.exit(1);
    19. }
    20. }

列出 Topic

ListTopicResult listTopic(String projectName);

  • 参数

    • projectName The name of the project in which you list.
  • 示例

    1. public static void listTopic() {
    2. try {
    3. datahubClient.deleteTopic(Constant.projectName, Constant.topicName);
    4. ListTopicResult listTopicResult = datahubClient.listTopic(Constant.projectName);
    5. if (listTopicResult.getTopicNames().size() > 0) {
    6. for (String tName : listTopicResult.getTopicNames()) {
    7. System.out.println(tName);
    8. }
    9. }
    10. } catch (InvalidParameterException e) {
    11. System.out.println("invalid parameter, please check your parameter");
    12. System.exit(1);
    13. } catch (AuthorizationFailureException e) {
    14. System.out.println("AK error, please check your accessId and accessKey");
    15. System.exit(1);
    16. } catch (ResourceNotFoundException e) {
    17. System.out.println("project not found");
    18. System.exit(1);
    19. } catch (DatahubClientException e) {
    20. System.out.println("other error");
    21. System.exit(1);
    22. }
    23. }

更新 Topic

更新Topic信息,目前只支持更新comment。

UpdateTopicResult updateTopic(String projectName, String topicName, String comment);

  • 参数

    • projectName The name of the project in which you list.
    • topicName The name of the topic.
    • comment The comment to modify.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void updateTopic() {
    2. try {
    3. String newComment = "new topic comment";
    4. datahubClient.updateTopic(Constant.projectName, Constant.topicName, newComment);
    5. System.out.println("update topic successful");
    6. //查看更新后结果
    7. GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
    8. System.out.println(getTopicResult.getComment());
    9. } catch (InvalidParameterException e) {
    10. System.out.println("invalid parameter, please check your parameter");
    11. System.exit(1);
    12. } catch (AuthorizationFailureException e) {
    13. System.out.println("AK error, please check your accessId and accessKey");
    14. System.exit(1);
    15. } catch (ResourceNotFoundException e) {
    16. System.out.println("project or topic not found");
    17. System.exit(1);
    18. } catch (DatahubClientException e) {
    19. System.out.println("other error");
    20. System.exit(1);
    21. }
    22. }

查询 Topic

GetTopicResult getTopic(String projectName, String topicName);

  • 参数

    • projectName The name of the project in which you get.
    • topicName The name of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getTopic() {
  2. try {
  3. GetTopicResult getTopicResult = datahubClient.getTopic(Constant.projectName, Constant.topicName);
  4. System.out.println(getTopicResult.getShardCount() + "\t"
  5. + getTopicResult.getLifeCycle() + "\t"
  6. + getTopicResult.getRecordType() + "\t"
  7. + getTopicResult.getComment());
  8. } catch (InvalidParameterException e) {
  9. System.out.println("invalid parameter, please check your parameter");
  10. System.exit(1);
  11. } catch (AuthorizationFailureException e) {
  12. System.out.println("AK error, please check your accessId and accessKey");
  13. System.exit(1);
  14. } catch (ResourceNotFoundException e) {
  15. System.out.println("project or topic not found");
  16. System.exit(1);
  17. } catch (DatahubClientException e) {
  18. System.out.println("other error");
  19. System.exit(1);
  20. }
  21. }

Tuple Topic 新增 Field

AppendFieldResult appendField(String projectName, String topicName, Field field);

  • 参数

    • projectName The name of the project in which you get.
    • topicName The name of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void appendNewField() {
  2. try {
  3. Field newField = new Field("newField", FieldType.STRING, true);
  4. datahubClient.appendField(Constant.projectName, Constant.topicName, newField);
  5. System.out.println("append field successful");
  6. } catch (InvalidParameterException e) {
  7. System.out.println("invalid parameter, please check your parameter");
  8. System.exit(1);
  9. } catch (AuthorizationFailureException e) {
  10. System.out.println("AK error, please check your accessId and accessKey");
  11. System.exit(1);
  12. } catch (ResourceNotFoundException e) {
  13. System.out.println("project or topic not found");
  14. System.exit(1);
  15. } catch (DatahubClientException e) {
  16. System.out.println("other error");
  17. System.exit(1);
  18. }
  19. }

Shard操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出 Shard

ListShardResult listShard(String projectName, String topicName);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

    1. public static void listShard() {
    2. try {
    3. ListShardResult listShardResult = datahubClient.listShard(Constant.projectName, Constant.topicName);
    4. if (listShardResult.getShards().size() > 0) {
    5. for (ShardEntry entry : listShardResult.getShards()) {
    6. System.out.println(entry.getShardId() + "\t"
    7. + entry.getState() + "\t"
    8. + entry.getLeftShardId() + "\t"
    9. + entry.getRightShardId());
    10. }
    11. }
    12. } catch (InvalidParameterException e) {
    13. System.out.println("invalid parameter, please check your parameter");
    14. System.exit(1);
    15. } catch (AuthorizationFailureException e) {
    16. System.out.println("AK error, please check your accessId and accessKey");
    17. System.exit(1);
    18. } catch (ResourceNotFoundException e) {
    19. System.out.println("project or topic not found");
    20. System.exit(1);
    21. } catch (DatahubClientException e) {
    22. System.out.println("other error");
    23. System.exit(1);
    24. }
    25. }

分裂 Shard

指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

SplitShardResult splitShard(String projectName, String topicName, String shardId);

SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The shard which to split.
    • splitKey The split key which is used to split shard.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void splitShard() {
  2. try {
  3. String shardId = "0";
  4. SplitShardResult splitShardResult = datahubClient.splitShard(Constant.projectName, Constant.topicName, shardId);
  5. for (ShardEntry entry : splitShardResult.getNewShards()) {
  6. System.out.println(entry.getShardId());
  7. }
  8. } catch (InvalidParameterException e) {
  9. System.out.println("invalid parameter, please check your parameter");
  10. System.exit(1);
  11. } catch (AuthorizationFailureException e) {
  12. System.out.println("AK error, please check your accessId and accessKey");
  13. System.exit(1);
  14. } catch (ResourceNotFoundException e) {
  15. System.out.println("project or topic not found");
  16. System.exit(1);
  17. } catch (DatahubClientException e) {
  18. System.out.println("other error");
  19. System.out.println(e);
  20. System.exit(1);
  21. }
  22. }

合并 Shard

合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The shard which will be merged.
    • adjacentShardId The adjacent shard of the specified shard.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void mergeShard() {
  2. try {
  3. String shardId = "7";
  4. //adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
  5. String adjacentShardId = "8";
  6. MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
  7. System.out.println("merge successful");
  8. System.out.println(mergeShardResult.getShardId());
  9. } catch (InvalidParameterException e) {
  10. System.out.println("invalid parameter, please check your parameter");
  11. System.exit(1);
  12. } catch (AuthorizationFailureException e) {
  13. System.out.println("AK error, please check your accessId and accessKey");
  14. System.exit(1);
  15. } catch (ResourceNotFoundException e) {
  16. System.out.println("project or topic not found");
  17. System.exit(1);
  18. } catch (DatahubClientException e) {
  19. System.out.println("other error");
  20. System.out.println(e);
  21. System.exit(1);
  22. }
  23. }

读写数据

状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

读数据

获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
  • LATEST : 表示获取的cursor指向当前最新的record。
  • SEQUENCE : 表示获取的cursor指向该序列的record。
  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);

GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • CursorType Which type used to get cursor.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • SeekOutOfRangeException
  • 示例代码
  1. public static void getcursor() {
  2. String shardId = "5";
  3. try {
  4. /* OLDEST用法示例 */
  5. String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  6. /* LATEST用法示例 */
  7. String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
  8. /* SEQUENCE用法示例 */
  9. //获取最新数据的sequence
  10. long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
  11. //获取最新的十条数据的读取位置
  12. String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
  13. /* SYSTEM_TIME用法示例 */
  14. //将时间转为时间戳形式
  15. String time = "2019-07-01 10:00:00";
  16. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  17. long timestamp = 0L;
  18. try {
  19. Date date = simpleDateFormat.parse(time);
  20. timestamp = date.getTime();//获取时间的时间戳
  21. //System.out.println(timestamp);
  22. } catch (ParseException e) {
  23. System.exit(-1);
  24. }
  25. //获取时间time之后的数据读取位置
  26. String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
  27. System.out.println("get cursor successful");
  28. } catch (InvalidParameterException e) {
  29. System.out.println("invalid parameter, please check your parameter");
  30. System.exit(1);
  31. } catch (AuthorizationFailureException e) {
  32. System.out.println("AK error, please check your accessId and accessKey");
  33. System.exit(1);
  34. } catch (ResourceNotFoundException e) {
  35. System.out.println("project or topic or shard not found");
  36. System.exit(1);
  37. } catch (SeekOutOfRangeException e) {
  38. System.out.println("offset invalid or has expired");
  39. System.exit(1);
  40. } catch (DatahubClientException e) {
  41. System.out.println("other error");
  42. System.out.println(e);
  43. System.exit(1);
  44. }
  45. }

读取数据接口:

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);

GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • schema If you read TUPLE records, you need this parameter.
    • cursor The start cursor used to read data.
    • limit Max record size to read.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ShardSealedException
    • LimitExceededException
  • 示例

1). 读取Tuple topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (InvalidParameterException e) {
  11. System.out.println("invalid parameter, please check your parameter");
  12. System.exit(1);
  13. } catch (AuthorizationFailureException e) {
  14. System.out.println("AK error, please check your accessId and accessKey");
  15. System.exit(1);
  16. } catch (ResourceNotFoundException e) {
  17. System.out.println("project or topic or shard not found");
  18. System.exit(1);
  19. } catch (SeekOutOfRangeException e) {
  20. System.out.println("offset invalid or has expired");
  21. System.exit(1);
  22. } catch (DatahubClientException e) {
  23. System.out.println("other error");
  24. System.out.println(e);
  25. System.exit(1);
  26. }
  27. while (true) {
  28. try {
  29. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
  30. if (result.getRecordCount() <= 0) {
  31. // 无数据,sleep后读取
  32. Thread.sleep(10000);
  33. continue;
  34. }
  35. for (RecordEntry entry : result.getRecords()) {
  36. TupleRecordData data = (TupleRecordData) entry.getRecordData();
  37. System.out.println("field1:" + data.getField("field1") + "\t"
  38. + "field2:" + data.getField("field2"));
  39. }
  40. // 拿到下一个游标
  41. cursor = result.getNextCursor();
  42. } catch (InvalidCursorException ex) {
  43. // 非法游标或游标已过期,建议重新定位后开始消费
  44. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  45. } catch (SeekOutOfRangeException e) {
  46. System.out.println("offset invalid");
  47. System.exit(1);
  48. } catch (ResourceNotFoundException e) {
  49. System.out.println("project or topic or shard not found");
  50. System.exit(1);
  51. } catch (ShardSealedException e) {
  52. System.out.println("shard is closed, all data has been read");
  53. System.exit(1);
  54. } catch (LimitExceededException e) {
  55. System.out.println("maybe exceed limit, retry");
  56. } catch (DatahubClientException e) {
  57. System.out.println("other error");
  58. System.out.println(e);
  59. System.exit(1);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }

2). 读取Blob topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (InvalidParameterException e) {
  11. System.out.println("invalid parameter, please check your parameter");
  12. System.exit(1);
  13. } catch (AuthorizationFailureException e) {
  14. System.out.println("AK error, please check your accessId and accessKey");
  15. System.exit(1);
  16. } catch (ResourceNotFoundException e) {
  17. System.out.println("project or topic or shard not found");
  18. System.exit(1);
  19. } catch (SeekOutOfRangeException e) {
  20. System.out.println("offset invalid or has expired");
  21. System.exit(1);
  22. } catch (DatahubClientException e) {
  23. System.out.println("other error");
  24. System.out.println(e);
  25. System.exit(1);
  26. }
  27. while (true) {
  28. try {
  29. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
  30. if (result.getRecordCount() <= 0) {
  31. // 无数据,sleep后读取
  32. Thread.sleep(10000);
  33. continue;
  34. }
  35. /* 消费数据 */
  36. // 拿到下一个游标
  37. cursor = result.getNextCursor();
  38. } catch (InvalidCursorException ex) {
  39. // 非法游标或游标已过期,建议重新定位后开始消费
  40. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  41. } catch (SeekOutOfRangeException e) {
  42. System.out.println("offset invalid");
  43. System.exit(1);
  44. } catch (ResourceNotFoundException e) {
  45. System.out.println("project or topic or shard not found");
  46. System.exit(1);
  47. } catch (ShardSealedException e) {
  48. System.out.println("shard is closed, all data has been read");
  49. System.exit(1);
  50. } catch (LimitExceededException e) {
  51. System.out.println("maybe exceed limit, retry");
  52. } catch (DatahubClientException e) {
  53. System.out.println("other error");
  54. System.out.println(e);
  55. System.exit(1);
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. }

写数据

服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecords接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。

两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。

如果服务端支持,建议用户使用putRecordsByShard接口。

PutRecordsResult putRecords(String projectName, String topicName, List records);

PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • records Records list to written.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ShardSealedException
    • LimitExceededException

1). 写入Tuple topic

  1. // 写入Tuple型数据
  2. public static void tupleExample() {
  3. String shardId = "9";
  4. // 获取schema
  5. recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
  6. // 生成十条数据
  7. List<RecordEntry> recordEntries = new ArrayList<>();
  8. for (int i = 0; i < 10; ++i) {
  9. RecordEntry recordEntry = new RecordEntry();
  10. // 对每条数据设置额外属性
  11. recordEntry.addAttribute("key1", "value1");
  12. TupleRecordData data = new TupleRecordData(recordSchema);
  13. data.setField("field1", "HelloWorld");
  14. data.setField("field2", 1234567);
  15. recordEntry.setRecordData(data);
  16. recordEntry.setShardId(shardId);
  17. recordEntries.add(recordEntry);
  18. }
  19. try {
  20. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  21. //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
  22. datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
  23. System.out.println("write data successful");
  24. } catch (InvalidParameterException e) {
  25. System.out.println("invalid parameter, please check your parameter");
  26. System.exit(1);
  27. } catch (AuthorizationFailureException e) {
  28. System.out.println("AK error, please check your accessId and accessKey");
  29. System.exit(1);
  30. } catch (ResourceNotFoundException e) {
  31. System.out.println("project or topic or shard not found");
  32. System.exit(1);
  33. } catch (ShardSealedException e) {
  34. System.out.println("shard status is CLOSED, can not write");
  35. System.exit(1);
  36. } catch (DatahubClientException e) {
  37. System.out.println("other error");
  38. System.out.println(e);
  39. System.exit(1);
  40. }
  41. }

2). 写入Blob topic

  1. // 写入blob型数据
  2. public static void blobExample() {
  3. // 生成十条数据
  4. List<RecordEntry> recordEntries = new ArrayList<>();
  5. String shardId = "4";
  6. for (int i = 0; i < 10; ++i) {
  7. RecordEntry recordEntry = new RecordEntry();
  8. // 对每条数据设置额外属性
  9. recordEntry.addAttribute("key1", "value1");
  10. BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
  11. recordEntry.setRecordData(data);
  12. recordEntry.setShardId(shardId);
  13. recordEntries.add(recordEntry);
  14. recordEntry.setShardId("0");
  15. }
  16. while (true) {
  17. try {
  18. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  19. //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
  20. datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
  21. System.out.println("write data successful");
  22. break;
  23. } catch (InvalidParameterException e) {
  24. System.out.println("invalid parameter, please check your parameter");
  25. System.exit(1);
  26. } catch (AuthorizationFailureException e) {
  27. System.out.println("AK error, please check your accessId and accessKey");
  28. System.exit(1);
  29. } catch (ResourceNotFoundException e) {
  30. System.out.println("project or topic or shard not found");
  31. System.exit(1);
  32. } catch (ShardSealedException e) {
  33. System.out.println("shard status is CLOSED, can not write");
  34. System.exit(1);
  35. } catch (LimitExceededException e) {
  36. System.out.println("maybe qps exceed limit, retry");
  37. } catch (DatahubClientException e) {
  38. System.out.println("other error");
  39. System.out.println(e);
  40. System.exit(1);
  41. }
  42. }
  43. }

多方式写入

在DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中

2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗

1). 按照ShardID写入

推荐方式,使用示例如下:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setShardId("0");

2). 按HashKey写入

指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的BeginHashKey与EndHashKey决定数据写入的Shard

使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

2). 按PartitionKey写入

指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的BeginHashKey与EndHashKey决定写入的Shard

使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setPartitionKey("TestPartitionKey");

Meter 操作

获取Meter

GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void getMeter() {
    2. String shardId = "5";
    3. try {
    4. GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(Constant.projectName, Constant.topicName, shardId);
    5. System.out.println("get meter successful");
    6. System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
    7. } catch (InvalidParameterException e) {
    8. System.out.println("invalid parameter, please check your parameter");
    9. System.exit(1);
    10. } catch (AuthorizationFailureException e) {
    11. System.out.println("AK error, please check your accessId and accessKey");
    12. System.exit(1);
    13. } catch (ResourceNotFoundException e) {
    14. System.out.println("project or topic or shard not found");
    15. System.exit(1);
    16. } catch (DatahubClientException e) {
    17. System.out.println("other error");
    18. System.out.println(e);
    19. System.exit(1);
    20. }
    21. }

Subscribtion操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription

CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • comment The comment of the subscription.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void createSubscription() {
    2. try {
    3. CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(Constant.projectName, Constant.topicName, Constant.subscribtionComment);
    4. System.out.println("create subscription successful");
    5. System.out.println(createSubscriptionResult.getSubId());
    6. } catch (InvalidParameterException e) {
    7. System.out.println("invalid parameter, please check your parameter");
    8. System.exit(1);
    9. } catch (AuthorizationFailureException e) {
    10. System.out.println("AK error, please check your accessId and accessKey");
    11. System.exit(1);
    12. } catch (ResourceNotFoundException e) {
    13. System.out.println(" project or topic not found,please create project first");
    14. System.exit(1);
    15. } catch (DatahubClientException e) {
    16. System.out.println("other error");
    17. System.out.println(e);
    18. System.exit(1);
    19. }
    20. }

删除 Subscription

DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void deleteSubscription() {
    2. try {
    3. datahubClient.deleteSubscription(Constant.projectName, Constant.topicName, subId);
    4. System.out.println("delete subscription successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceNotFoundException e) {
    12. System.out.println(" project or topic or subId not found,delete successful");
    13. } catch (DatahubClientException e) {
    14. System.out.println("other error");
    15. System.out.println(e);
    16. System.exit(1);
    17. }
    18. }

更新 Subscription

更新已存在的Subscription,目前只支持更新comment。

UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • comment The comment you want to update.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateSubscription() {
  2. try {
  3. String newComment = "new subscription comment";
  4. datahubClient.updateSubscription(Constant.projectName, Constant.topicName, subId, newComment);
  5. System.out.println("update subscription successful");
  6. } catch (InvalidParameterException e) {
  7. System.out.println("invalid parameter, please check your parameter");
  8. System.exit(1);
  9. } catch (AuthorizationFailureException e) {
  10. System.out.println("AK error, please check your accessId and accessKey");
  11. System.exit(1);
  12. } catch (ResourceNotFoundException e) {
  13. System.out.println(" project or topic or subId not found,delete successful");
  14. System.exit(1);
  15. } catch (DatahubClientException e) {
  16. System.out.println("other error");
  17. System.out.println(e);
  18. System.exit(1);
  19. }
  20. }

列出 Subscription

listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。

ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • pageNum The page number used to list subscriptions.
    • pageSize The page size used to list subscriptions.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例代码

  • 示例代码

  1. public static void listSubscription() {
  2. try {
  3. ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(Constant.projectName, Constant.topicName, Constant.pageNum, Constant.pageSize);
  4. if (listSubscriptionResult.getSubscriptions().size() > 0) {
  5. System.out.println(listSubscriptionResult.getTotalCount());
  6. System.out.println(listSubscriptionResult.getSubscriptions().size());
  7. for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
  8. System.out.println(entry.getSubId() + "\t"
  9. + entry.getState() + "\t"
  10. + entry.getType() + "\t"
  11. + entry.getComment());
  12. }
  13. }
  14. } catch (InvalidParameterException e) {
  15. System.out.println("invalid parameter, please check your parameter");
  16. System.exit(1);
  17. } catch (AuthorizationFailureException e) {
  18. System.out.println("AK error, please check your accessId and accessKey");
  19. System.exit(1);
  20. } catch (ResourceNotFoundException e) {
  21. System.out.println(" project or topic not found,delete successful");
  22. System.exit(1);
  23. } catch (DatahubClientException e) {
  24. System.out.println("other error");
  25. System.out.println(e);
  26. System.exit(1);
  27. }
  28. }

查询 Subscription

GetSubscriptionResult getSubscription(String projectName, String topicName, String subId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  1. public static void getSubscription() {
  2. try {
  3. GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(Constant.projectName, Constant.topicName, subId);
  4. System.out.println(getSubscriptionResult.getSubId() + "\t"
  5. + getSubscriptionResult.getState() + "\t"
  6. + getSubscriptionResult.getType() + "\t"
  7. + getSubscriptionResult.getComment());
  8. } catch (InvalidParameterException e) {
  9. System.out.println("invalid parameter, please check your parameter");
  10. System.exit(1);
  11. } catch (AuthorizationFailureException e) {
  12. System.out.println("AK error, please check your accessId and accessKey");
  13. System.exit(1);
  14. } catch (ResourceNotFoundException e) {
  15. System.out.println(" project or topic or subscription not found,delete successful");
  16. System.exit(1);
  17. } catch (DatahubClientException e) {
  18. System.out.println("other error");
  19. System.out.println(e);
  20. System.exit(1);
  21. }
  22. }

更新 Subscription 状态

Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。

UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • state The state you want to change.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateSubscriptionState() {
  2. try {
  3. datahubClient.updateSubscriptionState(Constant.projectName, Constant.topicName, subId, SubscriptionState.ONLINE);
  4. System.out.println("update subscription state successful");
  5. } catch (InvalidParameterException e) {
  6. System.out.println("invalid parameter, please check your parameter");
  7. System.exit(1);
  8. } catch (AuthorizationFailureException e) {
  9. System.out.println("AK error, please check your accessId and accessKey");
  10. System.exit(1);
  11. } catch (ResourceNotFoundException e) {
  12. System.out.println(" project or topic not found,delete successful");
  13. System.exit(1);
  14. } catch (DatahubClientException e) {
  15. System.out.println("other error");
  16. System.out.println(e);
  17. System.exit(1);
  18. }
  19. }

offset 操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化 offset

openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void openSubscriptionSession() {
  2. shardId = "4";
  3. shardIds = new ArrayList<String>();
  4. shardIds.add("0");
  5. shardIds.add("4");
  6. try {
  7. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  8. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  9. System.out.println(subscriptionOffset.getSessionId() + "\t"
  10. + subscriptionOffset.getVersionId() + "\t"
  11. + subscriptionOffset.getSequence());
  12. } catch (InvalidParameterException e) {
  13. System.out.println("invalid parameter, please check your parameter");
  14. System.exit(1);
  15. } catch (AuthorizationFailureException e) {
  16. System.out.println("AK error, please check your accessId and accessKey");
  17. System.exit(1);
  18. } catch (ResourceNotFoundException e) {
  19. System.out.println(" project or topic not or subscription found");
  20. System.exit(1);
  21. } catch (DatahubClientException e) {
  22. System.out.println("other error");
  23. System.out.println(e);
  24. System.exit(1);
  25. }
  26. }

获取 offset

GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds);

getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. //获取点位
  2. public static void getSubscriptionOffset() {
  3. shardId = "4";
  4. shardIds = new ArrayList<String>();
  5. shardIds.add("0");
  6. shardIds.add("4");
  7. try {
  8. GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds);
  9. SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
  10. System.out.println(subscriptionOffset.getVersionId() + "\t"
  11. + subscriptionOffset.getSequence());
  12. } catch (InvalidParameterException e) {
  13. System.out.println("invalid parameter, please check your parameter");
  14. System.exit(1);
  15. } catch (AuthorizationFailureException e) {
  16. System.out.println("AK error, please check your accessId and accessKey");
  17. System.exit(1);
  18. } catch (ResourceNotFoundException e) {
  19. System.out.println(" project or topic not or subscription found");
  20. System.exit(1);
  21. } catch (DatahubClientException e) {
  22. System.out.println("other error");
  23. System.out.println(e);
  24. System.exit(1);
  25. }
  26. }

提交 offset

CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);

提交点位会验证versionId和sessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的真实sequence和timestamp来填写。

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • offsets The offset map of shards.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • SubscriptionOffsetResetException
    • SubscriptionSessionInvalidException
    • SubscriptionOfflineException
  • 示例

    1. //提交点位
    2. public static void commitSubscriptionOffset() {
    3. while (true) {
    4. try {
    5. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
    6. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
    7. //这里仅仅测试提交,完整过程请参考点位消费样例
    8. subscriptionOffset.setSequence(10);
    9. subscriptionOffset.setTimestamp(100);
    10. Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
    11. offsets.put(shardId, subscriptionOffset);
    12. // 提交点位
    13. datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
    14. } catch (InvalidParameterException e) {
    15. System.out.println("invalid parameter, please check your parameter");
    16. System.exit(1);
    17. } catch (AuthorizationFailureException e) {
    18. System.out.println("AK error, please check your accessId and accessKey");
    19. System.exit(1);
    20. } catch (ResourceNotFoundException e) {
    21. System.out.println(" project or topic not or subscription found");
    22. System.exit(1);
    23. } catch (SubscriptionOffsetResetException e) {
    24. System.out.println("offset is reset elsewhere, retry");
    25. } catch (SubscriptionSessionInvalidException e) {
    26. System.out.println("offset is open elsewhere, retry");
    27. } catch (SubscriptionOfflineException e) {
    28. System.out.println("subscription is offline");
    29. System.exit(1);
    30. } catch (DatahubClientException e) {
    31. System.out.println("other error");
    32. System.out.println(e);
    33. System.exit(1);
    34. }
    35. }
    36. }

重置 offset

ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);

重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • offsets The offset map of shards.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. //重置点位
  2. public static void resetSubscriptionOffset() {
  3. //选择想要重置点位到的时间,并转换为时间戳
  4. String time = "2019-07-09 10:00:00";
  5. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  6. long timestamp = 0L;
  7. try {
  8. Date date = simpleDateFormat.parse(time);
  9. timestamp = date.getTime();//获取时间的时间戳
  10. //System.out.println(timestamp);
  11. } catch (ParseException e) {
  12. System.exit(1);
  13. }
  14. SubscriptionOffset offset = new SubscriptionOffset();
  15. offset.setTimestamp(timestamp);
  16. Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
  17. for (String shardId : shardIds) {
  18. offsets.put(shardId, offset);
  19. }
  20. try {
  21. datahubClient.resetSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
  22. System.out.println("reset successful");
  23. } catch (InvalidParameterException e) {
  24. System.out.println("invalid parameter, please check your parameter");
  25. System.exit(1);
  26. } catch (AuthorizationFailureException e) {
  27. System.out.println("AK error, please check your accessId and accessKey");
  28. System.exit(1);
  29. } catch (ResourceNotFoundException e) {
  30. System.out.println(" project or topic not or subscription found");
  31. System.exit(1);
  32. } catch (DatahubClientException e) {
  33. System.out.println("other error");
  34. System.out.println(e);
  35. System.exit(1);
  36. }
  37. }

Connector 操作

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector

CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config);

CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want create.
    • columnFields Which fields you want synchronize.
    • sinkStartTime Start time to sink from datahub. Unit: Ms
    • config Detail config of specified connector type.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void createConnector() {
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. SinkOdpsConfig config = new SinkOdpsConfig() {{
  4. setEndpoint(Constant.odps_endpoint);
  5. setProject(Constant.odps_project);
  6. setTable(Constant.odps_table);
  7. setAccessId(Constant.odps_accessId);
  8. setAccessKey(Constant.odps_accessKey);
  9. setPartitionMode(PartitionMode.SYSTEM_TIME);
  10. setTimeRange(60);
  11. }};
  12. //设置分区格式
  13. SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
  14. addConfig("ds", "%Y%m%d");
  15. addConfig("hh", "%H");
  16. addConfig("mm", "%M");
  17. }};
  18. config.setPartitionConfig(partitionConfig);
  19. try {
  20. //创建Connector
  21. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, columnFields, config);
  22. System.out.println("create connector successful");
  23. } catch (InvalidParameterException e) {
  24. System.out.println("invalid parameter, please check your parameter");
  25. System.exit(1);
  26. } catch (AuthorizationFailureException e) {
  27. System.out.println("AK error, please check your accessId and accessKey");
  28. System.exit(1);
  29. } catch (ResourceNotFoundException e) {
  30. System.out.println(" project or topic not found");
  31. System.exit(1);
  32. } catch (DatahubClientException e) {
  33. System.out.println("other error");
  34. System.out.println(e);
  35. System.exit(1);
  36. }
  37. }

删除 Connector

DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want delete.
    • columnFields Which fields you want synchronize.
    • sinkStartTime Start time to sink from datahub. Unit: Ms
    • config Detail config of specified connector type.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void deleteConnector() {
    2. try {
    3. datahubClient.deleteConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
    4. System.out.println("delete connector successful");
    5. } catch (InvalidParameterException e) {
    6. System.out.println("invalid parameter, please check your parameter");
    7. System.exit(1);
    8. } catch (AuthorizationFailureException e) {
    9. System.out.println("AK error, please check your accessId and accessKey");
    10. System.exit(1);
    11. } catch (ResourceNotFoundException e) {
    12. System.out.println(" project or topic or connector not found, delete successful");
    13. } catch (DatahubClientException e) {
    14. System.out.println("other error");
    15. System.out.println(e);
    16. System.exit(1);
    17. }
    18. }

查询 Connector

GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want get.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void getConnector() {
    2. try {
    3. GetConnectorResult getConnectorResult = datahubClient.getConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
    4. System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
    5. for (String fieldName : getConnectorResult.getColumnFields()) {
    6. System.out.println(fieldName);
    7. }
    8. } catch (InvalidParameterException e) {
    9. System.out.println("invalid parameter, please check your parameter");
    10. System.exit(1);
    11. } catch (AuthorizationFailureException e) {
    12. System.out.println("AK error, please check your accessId and accessKey");
    13. System.exit(1);
    14. } catch (ResourceNotFoundException e) {
    15. System.out.println(" project or topic or connector not found");
    16. System.exit(1);
    17. } catch (DatahubClientException e) {
    18. System.out.println("other error");
    19. System.out.println(e);
    20. System.exit(1);
    21. }
    22. }

更新 Connector

更新Connector的配置。

UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want update.
    • config Detail config of specified connector type.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateConnector() {
  2. SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS).getConfig();
  3. //修改配置
  4. config.setTimeRange(100);
  5. config.setAccessId(Constant.accessId);
  6. config.setAccessKey(Constant.accessKey);
  7. try {
  8. datahubClient.updateConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, config);
  9. System.out.println("update connector successful");
  10. } catch (InvalidParameterException e) {
  11. System.out.println("invalid parameter, please check your parameter");
  12. System.exit(1);
  13. } catch (AuthorizationFailureException e) {
  14. System.out.println("AK error, please check your accessId and accessKey");
  15. System.exit(1);
  16. } catch (ResourceNotFoundException e) {
  17. System.out.println(" project or topic or connector not found");
  18. System.exit(1);
  19. } catch (DatahubClientException e) {
  20. System.out.println("other error");
  21. System.out.println(e);
  22. System.exit(1);
  23. }
  24. }

更新 Connector state

UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • connectorState The state of the connector. Support: ConnectorState.PAUSED, ConnectorState.RUNNING.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void updateConnectorState() {
    2. try {
    3. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
    4. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
    5. System.out.println("update connector state successful");
    6. } catch (InvalidParameterException e) {
    7. System.out.println(e);
    8. System.out.println("invalid parameter, please check your parameter");
    9. System.exit(1);
    10. } catch (AuthorizationFailureException e) {
    11. System.out.println("AK error, please check your accessId and accessKey");
    12. System.exit(1);
    13. } catch (ResourceNotFoundException e) {
    14. System.out.println(" project or topic or connector not found");
    15. System.exit(1);
    16. } catch (DatahubClientException e) {
    17. System.out.println("other error");
    18. System.out.println(e);
    19. System.exit(1);
    20. }
    21. }

更新 Connector offset

UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard. If shardId is null, then update all shards offset.
    • offset The connector offset.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateConnectorOffset() {
  2. ConnectorOffset offset = new ConnectorOffset() {{
  3. setSequence(10);
  4. setTimestamp(1000);
  5. }};
  6. try {
  7. //更新Connector点位需要先停止Connector
  8. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
  9. datahubClient.updateConnectorOffset(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId, offset);
  10. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
  11. System.out.println("update connector offset successful");
  12. } catch (InvalidParameterException e) {
  13. System.out.println(e);
  14. System.out.println("invalid parameter, please check your parameter");
  15. System.exit(1);
  16. } catch (AuthorizationFailureException e) {
  17. System.out.println("AK error, please check your accessId and accessKey");
  18. System.exit(1);
  19. } catch (ResourceNotFoundException e) {
  20. System.out.println(" project or topic or connector not found");
  21. System.exit(1);
  22. } catch (DatahubClientException e) {
  23. System.out.println("other error");
  24. System.out.println(e);
  25. System.exit(1);
  26. }
  27. }

列出 Connector

ListConnectorResult listConnector(String projectName, String topicName);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
    1. public static void listConnector() {
    2. try {
    3. ListConnectorResult listConnectorResult = datahubClient.listConnector(Constant.projectName, Constant.topicName);
    4. for (String cName : listConnectorResult.getConnectorNames()) {
    5. System.out.println(cName);
    6. }
    7. } catch (InvalidParameterException e) {
    8. System.out.println(e);
    9. System.out.println("invalid parameter, please check your parameter");
    10. System.exit(1);
    11. } catch (AuthorizationFailureException e) {
    12. System.out.println("AK error, please check your accessId and accessKey");
    13. System.exit(1);
    14. } catch (ResourceNotFoundException e) {
    15. System.out.println(" project or topic not found");
    16. System.exit(1);
    17. } catch (DatahubClientException e) {
    18. System.out.println("other error");
    19. System.out.println(e);
    20. System.exit(1);
    21. }
    22. }

查询 Connector Shard 状态

GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType);

ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getConnectorShardStatusByShard() {
  2. try {
  3. ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId);
  4. System.out.println(connectorShardStatusEntry.getState() + "\t"
  5. + connectorShardStatusEntry.getCurrSequence() + "\t"
  6. + connectorShardStatusEntry.getDiscardCount() + "\t"
  7. + connectorShardStatusEntry.getUpdateTime());
  8. } catch (InvalidParameterException e) {
  9. System.out.println("invalid parameter, please check your parameter");
  10. System.exit(1);
  11. } catch (AuthorizationFailureException e) {
  12. System.out.println("AK error, please check your accessId and accessKey");
  13. System.exit(1);
  14. } catch (ResourceNotFoundException e) {
  15. System.out.println(" project or topic or connector not found");
  16. System.exit(1);
  17. } catch (DatahubClientException e) {
  18. System.out.println("other error");
  19. System.out.println(e);
  20. System.exit(1);
  21. }
  22. }
  23. public static void getConnectorShardStatus() {
  24. try {
  25. GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  26. for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
  27. System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
  28. + entry.getValue().getCurrSequence() + "\t"
  29. + entry.getValue().getDiscardCount() + "\t"
  30. + entry.getValue().getUpdateTime());
  31. }
  32. } catch (InvalidParameterException e) {
  33. System.out.println("invalid parameter, please check your parameter");
  34. System.exit(1);
  35. } catch (AuthorizationFailureException e) {
  36. System.out.println("AK error, please check your accessId and accessKey");
  37. System.exit(1);
  38. } catch (ResourceNotFoundException e) {
  39. System.out.println(" project or topic or connector not found");
  40. System.exit(1);
  41. } catch (DatahubClientException e) {
  42. System.out.println("other error");
  43. System.out.println(e);
  44. System.exit(1);
  45. }
  46. }

重启 Connector

ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType);

ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void reloadConnector() {
  2. try {
  3. datahubClient.reloadConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println("reload connector successful");
  5. } catch (InvalidParameterException e) {
  6. System.out.println("invalid parameter, please check your parameter");
  7. System.exit(1);
  8. } catch (AuthorizationFailureException e) {
  9. System.out.println("AK error, please check your accessId and accessKey");
  10. System.exit(1);
  11. } catch (ResourceNotFoundException e) {
  12. System.out.println(" project or topic or connector not found");
  13. System.exit(1);
  14. } catch (DatahubClientException e) {
  15. System.out.println("other error");
  16. System.out.println(e);
  17. System.exit(1);
  18. }
  19. }
  20. public static void reloadConnectorByShard() {
  21. try {
  22. datahubClient.reloadConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId);
  23. System.out.println("reload connector successful");
  24. } catch (InvalidParameterException e) {
  25. System.out.println("invalid parameter, please check your parameter");
  26. System.exit(1);
  27. } catch (AuthorizationFailureException e) {
  28. System.out.println("AK error, please check your accessId and accessKey");
  29. System.exit(1);
  30. } catch (ResourceNotFoundException e) {
  31. System.out.println(" project or topic or connector not found");
  32. System.exit(1);
  33. } catch (DatahubClientException e) {
  34. System.out.println("other error");
  35. System.out.println(e);
  36. System.exit(1);
  37. }
  38. }

查询 Connector 完成时间

GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例

  1. public static void getDoneTime() {
  2. try {
  3. GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println(getConnectorDoneTimeResult.getDoneTime());
  5. } catch (InvalidParameterException e) {
  6. System.out.println("invalid parameter, please check your parameter");
  7. System.exit(1);
  8. } catch (AuthorizationFailureException e) {
  9. System.out.println("AK error, please check your accessId and accessKey");
  10. System.exit(1);
  11. } catch (ResourceNotFoundException e) {
  12. System.out.println(" project or topic or connector not found");
  13. System.exit(1);
  14. } catch (DatahubClientException e) {
  15. System.out.println("other error");
  16. System.out.println(e);
  17. System.exit(1);
  18. }
  19. }

添加新的Field

AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName);

可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • fieldName The field to append. Field value must allow null.
  • Exception

    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void appendConnectorField() {
  2. String newField = "newfield";
  3. try {
  4. //要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
  5. datahubClient.appendConnectorField(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, newField);
  6. } catch (InvalidParameterException e) {
  7. System.out.println("invalid parameter, please check your parameter");
  8. System.exit(1);
  9. } catch (AuthorizationFailureException e) {
  10. System.out.println("AK error, please check your accessId and accessKey");
  11. System.exit(1);
  12. } catch (ResourceNotFoundException e) {
  13. System.out.println("project or topic or connector not found");
  14. System.exit(1);
  15. } catch (DatahubClientException e) {
  16. System.out.println("other error");
  17. System.out.println(e);
  18. System.exit(1);
  19. }
  20. }