全部产品

Java SDK

更新时间:2020-11-23 16:05:30

Java-SDK


一. Maven依赖以及JDK:


1. Maven Pom

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.17.1-public</version>
  5. </dependency>


2. JDK
jdk: >= 1.8

二. 常见问题

  1. 接口调用
    正常情况下,SDK只有putRecords / putRecordsByShardgetRecords接口是需要进行频繁调用进行数据读写的,其他的接口比如getTopicgetCursorlistShard等接口一般只有初始化时需要调用。
  2. Client初始化
    在项目工程中,可以有一个或者多个DatahubClient实例,DatahubClient实例可以并发使用。
  3. 不同包中的同名类遇到相同类名不同包路径的情况,2.12版本使用的均为com.aliyun.datahub.client包中的类,其他包中的类是为了兼容版本低于2.12的使用方式。例如:
  4. com.aliyun.datahub.client.model.RecordSchema (2.12版本)
  5. com.aliyun.datahub.common.data.RecordSchema (使用2.12之前版本sdk编写的用户代码,在升级sdk后不修改代码,则继续使用此类型)
  6. 出现错误 ‘Parse body failed, Offset: 0’,尝试将enableBinary参数设置为false


三. SDK实践指南

初始化


用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。
以下代码用于使用DataHub域名新建DataHubClient:

  1. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  2. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  3. String accessId = "<YourAccessKeyId>";
  4. String accessKey = "<YourAccessKeySecret>";
  5. // 创建DataHubClient实例
  6. DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
  7. .setDatahubConfig(
  8. new DatahubConfig(endpoint,
  9. // 是否开启二进制传输,服务端2.12版本开始支持
  10. new AliyunAccount(accessId, accessKey), true))
  11. //专有云使用出错尝试将参数设置为 false
  12. // HttpConfig可不设置,不设置时采用默认值
  13. .setHttpConfig(new HttpConfig()
  14. .setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩
  15. .setConnTimeout(10000))
  16. .build();


配置描述:
1). DatahubConfig

名称 描述
endpoint DataHub服务地址
account 阿里云账号信息
enableBinary 是否采用二进制传输,服务端从2.12版本开始支持,之前版本需设置为false,专有云使用时出现错误’Parse body failed, Offset: 0’,尝试设置为false


2). HttpConfig

名称 描述
readTimeout Socket读写超时时间,默认10s
connTimeout TCP连接超时时间,默认10s
maxRetryCount 请求失败重试,默认1,不建议修改,重试由上层业务层处理
debugRequest 是否打印请求日志信息,默认false
compressType 数据传输压缩方式,默认不压缩,支持lz4, deflate压缩
proxyUri 代理服务器主机地址
proxyUsername 代理服务器验证的用户名
proxyPassword 代理服务器验证的密码


3). SDK统计信息
SDK支持针对put/get等请求进行QPS等统计,开启方式:_

  1. ClientMetrics.startMetrics();


metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为: com.aliyun.datahub.client.metrics

写入数据到DataHub

以Tuple类型Topic为例

  1. // 写入Tuple型数据
  2. public static void tupleExample() {
  3. String shardId = "9";
  4. // 获取schema
  5. recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
  6. // 生成十条数据
  7. List<RecordEntry> recordEntries = new ArrayList<>();
  8. for (int i = 0; i < 10; ++i) {
  9. RecordEntry recordEntry = new RecordEntry();
  10. // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
  11. recordEntry.addAttribute("key1", "value1");
  12. TupleRecordData data = new TupleRecordData(recordSchema);
  13. data.setField("field1", "HelloWorld");
  14. data.setField("field2", 1234567);
  15. recordEntry.setRecordData(data);
  16. recordEntry.setShardId(shardId);
  17. recordEntries.add(recordEntry);
  18. }
  19. try {
  20. int i = datahubclient.putRecords(project, topic, records).getFailedRecordCount();
  21. if (i > 0) {
  22. retry(datahubclient, records, 1, project, topic);
  23. }
  24. } catch (DatahubClientException e) {
  25. System.out.println(e.getMessage());
  26. }
  27. }
  28. //重试机制
  29. public static void retry(DatahubClient client, List<com.aliyun.datahub.client.model.RecordEntry> records, int retryTimes, String project, String topic) {
  30. boolean suc = false;
  31. while (retryTimes != 0) {
  32. retryTimes = retryTimes - 1;
  33. int failedNum = client.putRecords(project, topic, records).getFailedRecordCount();
  34. if (failedNum > 0) {
  35. continue;
  36. }
  37. suc = true;
  38. break;
  39. }
  40. if (!suc) {
  41. System.out.println("retryFailure");
  42. }
  43. }

创建订阅消费DataHub数据

  1. //点位消费示例,并在消费过程中进行点位的提交
  2. public static void example() {
  3. String shardId = "0";
  4. List<String> shardIds = Arrays.asList("0", "1");
  5. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  6. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  7. // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  8. String cursor = null;
  9. //sequence < 0说明未消费
  10. if (subscriptionOffset.getSequence() < 0) {
  11. // 获取生命周期内第一条record的cursor
  12. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  13. } else {
  14. // 获取下一条记录的Cursor
  15. long nextSequence = subscriptionOffset.getSequence() + 1;
  16. try {
  17. //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
  18. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  19. } catch (SeekOutOfRangeException e) {
  20. // 获取生命周期内第一条record的cursor
  21. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  22. }
  23. }
  24. // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  25. long recordCount = 0L;
  26. // 每次读取10条record
  27. int fetchNum = 10;
  28. while (true) {
  29. try {
  30. GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
  31. if (getRecordsResult.getRecordCount() <= 0) {
  32. // 无数据,sleep后读取
  33. Thread.sleep(1000);
  34. continue;
  35. }
  36. for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
  37. //消费数据
  38. TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
  39. System.out.println("field1:" + data.getField("field1") + "\t"
  40. + "field2:" + data.getField("field2"));
  41. // 处理数据完成后,设置点位
  42. ++recordCount;
  43. subscriptionOffset.setSequence(recordEntry.getSequence());
  44. subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
  45. if (recordCount % 1000 == 0) {
  46. //提交点位点位
  47. Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
  48. offsetMap.put(shardId, subscriptionOffset);
  49. datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
  50. System.out.println("commit offset successful");
  51. }
  52. }
  53. cursor = getRecordsResult.getNextCursor();
  54. } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
  55. // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
  56. break;
  57. } catch (OffsetResetedException e) {
  58. // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
  59. // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
  60. subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
  61. long nextSequence = subscriptionOffset.getSequence() + 1;
  62. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  63. } catch (DatahubClientException e) {
  64. // TODO: 针对不同异常决定是否退出
  65. } catch (Exception e) {
  66. break;
  67. }
  68. }
  69. }

四.异常类型


Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。其中异常类型中,除DatahubClientException和LimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientException和LimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。
以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception

类名 错误码 描述
InvalidParameterException InvalidParameter, InvalidCursor 非法参数
ResourceNotFoundException ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )
ResourceAlreadyExistException ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist 资源已存在(创建时如果资源已存在,就会抛出这个异常
SeekOutOfRangeException SeekOutOfRange getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间
AuthorizationFailureException Unauthorized Authorization 签名解析异常,检查AK是否填写正确
NoPermissionException NoPermission, OperationDenied 没有权限,通常是RAM配置不正确,或没有正确授权子账号
ShardSealedException InvalidShardOperation shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常
LimitExceededException LimitExceeded 接口使用超限,参考 限制描述
SubscriptionOfflineException SubscriptionOffline 订阅处于下线状态不可用
SubscriptionSessionInvalidException OffsetSessionChanged, OffsetSessionClosed 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常
SubscriptionOffsetResetException OffsetReseted 订阅点位被重置
MalformedRecordException MalformedRecord,ShardNotReady 非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持、等等
DatahubClientException 其他所有,并且是所有异常的基类 如排除以上异常情况,通常重试即可,但应限制重试次数


五.API说明

Project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建 Project

CreateProjectResult createProject(String projectName, String comment);


创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

  • 参数
    • projectName project name
    • comment project comment
  • Exception
    • DatahubClientException
  • 示例
  1. public static void createProject() {
  2. try {
  3. datahubClient.createProject(Constant.projectName, Constant.projectComment);
  4. System.out.println("create project successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


删除 Project

DeleteProjectResult deleteProject(String projectName);删除Project时必须保证Project中已经没有Topic。

  • 参数
    • projectName project name
  • Exception
    • DatahubClientException
    • NoPermissionException, project内仍有topic时则会抛出的异常
  • 示例
  1. public static void deleteProject() {
  2. try {
  3. datahubClient.deleteProject(Constant.projectName);
  4. System.out.println("delete project successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


更新 Project

UpdateProjectResult updateProject(String projectName, String comment); 更新project信息,目前只支持更新comment。

  • 参数
    • projectName project name
    • comment project comment
  • Exception
    • DatahubClientException
  • 示例
  1. public static void updateProject() {
  2. try {
  3. String newComment = "new project comment";
  4. datahubClient.updateProject(Constant.projectName, newComment);
  5. System.out.println("update project successful");
  6. } catch (DatahubClientException e) {
  7. System.out.println("other error");
  8. }
  9. }


列出 Project

ListProjectResult listProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。

  • 参数
  • Exception
    • DatahubClientException
  • 示例
  1. public static void listProject() {
  2. try {
  3. ListProjectResult listProjectResult = datahubClient.listProject();
  4. if (listProjectResult.getProjectNames().size() > 0) {
  5. for (String pName : listProjectResult.getProjectNames()) {
  6. System.out.println(pName);
  7. }
  8. }
  9. } catch (DatahubClientException e) {
  10. System.out.println(e.getErrorMessage());
  11. }
  12. }


查询 Project

GetProjectResult getProject(String projectName);可以查看当前Project的一些属性信息

  • 参数
    • projectName project name
  • Exception
    • DatahubClientException

  • 示例
  1. public static void getProject() {
  2. try {
  3. GetProjectResult getProjectResult = datahubClient.getProject(Constant.projectName );
  4. System.out.println(getProjectResult.getCreateTime() + "\t"
  5. + getProjectResult.getLastModifyTime() + "\t"
  6. + getProjectResult.getComment());
  7. } catch (DatahubClientException e) {
  8. System.out.println(e.getErrorMessage());
  9. }
  10. }


Topic 操作


Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record
  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型 含义 值域
BIGINT 8字节有符号整型 -9223372036854775807 ~ 9223372036854775807
DOUBLE 8字节双精度浮点数 -1.0 _10^308 ~ 1.0 _10^308
BOOLEAN 布尔类型 True/False或true/false或0/1
TIMESTAMP 时间戳类型 表示到微秒的时间戳类型
STRING 字符串,只支持UTF-8编码 单个STRING列最长允许2MB
TINYINT 单字节整型 -128 ~127
SMALLINT 双字节整型 -32768 ~ 32767
INTEGER 4字节整型 -2147483648 ~ 2147483647
FLOAT 4字节单精度浮点数 -3.40292347_10^38 ~ 3.40292347_10^38


DataHub 中的 TINYINT , SMALLINT , INTEGER , FLOAT 类型从 java sdk 2.16.1-public 开始支持。

创建 Tuple Topic

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment);

  • 参数
    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • recordSchema The records schema of this topic.
    • comment The comment of the topic.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void createTupleTopic() {
  2. RecordSchema recordSchema = new RecordSchema();
  3. schema.addField(new Field("bigint_field", FieldType.BIGINT));
  4. schema.addField(new Field("double_field", FieldType.DOUBLE));
  5. schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
  6. schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
  7. schema.addField(new Field("tinyint_field", FieldType.TINYINT));
  8. schema.addField(new Field("smallint_field", FieldType.SMALLINT));
  9. schema.addField(new Field("integer_field", FieldType.INTEGER));
  10. schema.addField(new Field("floar_field", FieldType.FLOAT));
  11. schema.addField(new Field("decimal_field", FieldType.DECIMAL));
  12. schema.addField(new Field("string_field", FieldType.STRING));
  13. try {
  14. datahubClient.createTopic(Constant.projectName, Constant.topicName, Constant.shardCount, Constant.lifeCycle, RecordType.TUPLE, recordSchema, Constant.topicComment);
  15. System.out.println("create topic successful");
  16. } catch (DatahubClientException e) {
  17. System.out.println(e.getErrorMessage());
  18. }
  19. }


创建 Blob Topic

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment);

  • 参数
    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • comment The comment of the topic.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ResourceAlreadyExistException
  • 示例
  1. public static void createBlobTopic() {
  2. try {
  3. datahubClient.createTopic(Constant.projectName, Constant.blobTopicName, Constant.shardCount, Constant.lifeCycle, RecordType.BLOB, Constant.topicComment);
  4. System.out.println("create topic successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }


删除 Topic


删除topic之前要保证topic中没有subscription和connector,否则会报错NoPermission

DeleteTopicResult deleteTopic(String projectName, String topicName);

  • 参数
    • projectName The name of the project in which you delete.
    • topicName The name of the topic.
  • Exception
    • DatahubClientException
    • NoPermissionException, topic内存在subscription和connector会报此错误
  • 示例
  1. public static void deleteTopic() {
  2. try {
  3. datahubClient.deleteTopic(Constant.projectName, Constant.topicName);
  4. System.out.println("delete topic successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


列出 Topic

ListTopicResult listTopic(String projectName);

  • 参数
    • projectName The name of the project in which you list.
  • 示例
  1. public static void listTopic() {
  2. try {
  3. datahubClient.deleteTopic(Constant.projectName, Constant.topicName);
  4. ListTopicResult listTopicResult = datahubClient.listTopic(Constant.projectName);
  5. if (listTopicResult.getTopicNames().size() > 0) {
  6. for (String tName : listTopicResult.getTopicNames()) {
  7. System.out.println(tName);
  8. }
  9. }
  10. } catch (DatahubClientException e) {
  11. System.out.println(e.getErrorMessage());
  12. }
  13. }


更新 Topic


更新Topic信息,目前支持更新comment和lifeCycle

UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment);

  • 参数
    • projectName The name of the project in which you list.
    • topicName The name of the topic.
    • comment The comment to modify.
    • lifeCycle the lifeCycle of the topic
  • Exception
    • DatahubClientException
  • 示例
  1. public static void updateTopic() {
  2. try {
  3. String newComment = "new topic comment";
  4. int lifeCycle = 1;
  5. datahubClient.updateTopic(Constant.projectName, Constant.topicName,lifeCycle newComment);
  6. System.out.println("update topic successful");
  7. //查看更新后结果
  8. GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
  9. System.out.println(getTopicResult.getComment());
  10. } catch (DatahubClientException e) {
  11. System.out.println(e.getErrorMessage());
  12. }
  13. }


查询 Topic

GetTopicResult getTopic(String projectName, String topicName);可以获取Topic的相关属性

  • 参数
    • projectName The name of the project in which you get.
    • topicName The name of the topic.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void getTopic() {
  2. try {
  3. GetTopicResult getTopicResult = datahubClient.getTopic(Constant.projectName, Constant.topicName);
  4. System.out.println(getTopicResult.getShardCount() + "\t"
  5. + getTopicResult.getLifeCycle() + "\t"
  6. + getTopicResult.getRecordType() + "\t"
  7. + getTopicResult.getComment());
  8. } catch (DatahubClientException e) {
  9. System.out.println(e.getErrorMessage());
  10. }
  11. }


Tuple Topic 新增 Field

AppendFieldResult appendField(String projectName, String topicName, Field field);

  • 参数
    • projectName The name of the project in which you get.
    • topicName The name of the topic.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void appendNewField() {
  2. try {
  3. Field newField = new Field("newField", FieldType.STRING, true);
  4. datahubClient.appendField(Constant.projectName, Constant.topicName, newField);
  5. System.out.println("append field successful");
  6. } catch (DatahubClientException e) {
  7. System.out.println(e.getErrorMessage());
  8. }
  9. }


Shard操作


Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出 Shard

ListShardResult listShard(String projectName, String topicName);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void listShard() {
  2. try {
  3. ListShardResult listShardResult = datahubClient.listShard(Constant.projectName, Constant.topicName);
  4. if (listShardResult.getShards().size() > 0) {
  5. for (ShardEntry entry : listShardResult.getShards()) {
  6. System.out.println(entry.getShardId() + "\t"
  7. + entry.getState() + "\t"
  8. + entry.getLeftShardId() + "\t"
  9. + entry.getRightShardId());
  10. }
  11. }
  12. } catch (DatahubClientException e) {
  13. System.out.println(e.getErrorMessage());
  14. }
  15. }


分裂 Shard


指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。CLOSED状态的shard只可以读,不可以写,可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

SplitShardResult splitShard(String projectName, String topicName, String shardId);

SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The shard which to split.
    • splitKey The split key which is used to split shard.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void splitShard() {
  2. try {
  3. String shardId = "0";
  4. SplitShardResult splitShardResult = datahubClient.splitShard(Constant.projectName, Constant.topicName, shardId);
  5. for (ShardEntry entry : splitShardResult.getNewShards()) {
  6. System.out.println(entry.getShardId());
  7. }
  8. } catch (DatahubClientException e) {
  9. System.out.println(e.getErrorMessage());
  10. }
  11. }


合并 Shard


合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The shard which will be merged.
    • adjacentShardId The adjacent shard of the specified shard.
  • Exception
    • DatahubClientException
  • 示例
  1. public static void mergeShard() {
  2. try {
  3. String shardId = "7";
  4. //adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
  5. String adjacentShardId = "8";
  6. MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
  7. System.out.println("merge successful");
  8. System.out.println(mergeShardResult.getShardId());
  9. } catch (DatahubClientException e) {
  10. System.out.println(e.getErrorMessage());
  11. }
  12. }


读写数据

状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

1.读数据

读数据分为两步,首先是获取cursor,接着将获取到的cursor值传入到getRecords方法,需要注意的是,DataHub已经提供了订阅功能,用户可直接关联订阅对数据进行消费,服务端自动保存点位,读数据的主要用途在于抽样查看数据的质量

2.获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
  • LATEST : 表示获取的cursor指向当前最新的record。
  • SEQUENCE : 表示获取的cursor指向该序列的record。
  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

应该选择哪种呢?

首先要明白一点,读取的数据一定要在有效期内,也就是生命周期内,否则读取数据会报错

  • 如果场景是需要从头开始读取数据的话,选取OLDEST无疑是最合适的,如果数据都在有效期内,则会从第一条数据开始读取
  • 如果是做抽样使用,需要查看某个时间点之后数据是否是正常的,则应该选择SYSTEM_TIME 模式,将会从获取到的sequence+1位置开始读取
  • 使用场景是查看当前最新数据的情况,则应该使用LATEST ,用户可通过LATEST 模式一直读取最新写入的数据,当前也可以读取最新写入数据的前N条,这就需要先获取到sequence,然后sequence-N距离最新写入前N条的数据

GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);

GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • CursorType Which type used to get cursor.
  • Exception
    • DatahubClientException
    • SeekOutOfRangeException
  • 示例代码

    • 抽样场景,先将Date转换为timestamp,然后获取cursor

      1. public static void getcursor() {
      2. String shardId = "5";
      3. try {
      4. //将时间转为时间戳形式
      5. String time = "2019-07-01 10:00:00";
      6. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      7. long timestamp = 0L;
      8. try {
      9. Date date = simpleDateFormat.parse(time);
      10. timestamp = date.getTime();//获取时间的时间戳
      11. //System.out.println(timestamp);
      12. }
      13. //获取时间time之后的数据读取位置
      14. String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
      15. System.out.println("get cursor successful");
      16. } catch (DatahubClientException e) {
      17. System.out.println(e.getErrorMessage());
      18. } catch (ParseException e) {
      19. System.out.println(e.getErrorOffset());
      20. }
      21. }
    • 从头开始读取数据

      1. public static void getcursor() {
      2. String shardId = "5";
      3. try {
      4. /* OLDEST用法示例 */
      5. String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
      6. System.out.println("get cursor successful");
      7. } catch (DatahubClientException e) {
      8. System.out.println(e.getErrorMessage());
      9. }
      10. }
    • 读取最新写入的数据

      • 分为两种情况,第一种是最新写入的最后一条数据
      • 第二种是最新写入的前N条数据
        • 需要先获取最新写入数据的sequence,然后再获取cursor
  1. public static void getcursor() {
  2. String shardId = "5";
  3. try {
  4. /* LATEST用法示例 */
  5. String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
  6. /* SEQUENCE用法示例 */
  7. //获取最新数据的sequence
  8. long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
  9. //获取最新的十条数据的读取位置
  10. String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
  11. }
  12. catch (DatahubClientException e) {
  13. System.out.println(e.getErrorMessage());
  14. }
  15. }


读取数据接口:

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);

GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • schema If you read TUPLE records, you need this parameter.
    • cursor The start cursor used to read data.
    • limit Max record size to read.
  • Exception
    • DatahubClientException
  • 示例


1). 读取Tuple topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (DatahubClientException e) {
  11. System.out.println(e.getErrorMessage());
  12. }
  13. while (true) {
  14. try {
  15. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
  16. if (result.getRecordCount() <= 0) {
  17. // 无数据,sleep后读取
  18. Thread.sleep(10000);
  19. continue;
  20. }
  21. for (RecordEntry entry : result.getRecords()) {
  22. TupleRecordData data = (TupleRecordData) entry.getRecordData();
  23. System.out.println("field1:" + data.getField("field1") + "\t"
  24. + "field2:" + data.getField("field2"));
  25. }
  26. // 拿到下一个游标
  27. cursor = result.getNextCursor();
  28. } catch (InvalidCursorException ex) {
  29. // 非法游标或游标已过期,建议重新定位后开始消费
  30. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  31. } catch (DatahubClientException e) {
  32. System.out.println(e.getErrorMessage());;
  33. }
  34. }
  35. }


2). 读取Blob topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (DatahubClientException e) {
  11. System.out.println(e.getErrorMessage());
  12. }
  13. while (true) {
  14. try {
  15. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
  16. if (result.getRecordCount() <= 0) {
  17. // 无数据,sleep后读取
  18. Thread.sleep(10000);
  19. continue;
  20. }
  21. /* 消费数据 */
  22. for (RecordEntry record: result.getRecords()){
  23. BlobRecordData data = (BlobRecordData) record.getRecordData();
  24. System.out.println(new String(data.getData()));
  25. }
  26. // 拿到下一个游标
  27. cursor = result.getNextCursor();
  28. } catch (InvalidCursorException ex) {
  29. // 非法游标或游标已过期,建议重新定位后开始消费
  30. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  31. } catch (DatahubClientException e) {
  32. System.out.println(e.getErrorMessage());
  33. }
  34. }
  35. }


写数据


服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。
两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。
如果服务端支持,建议用户使用putRecordsByShard接口。

PutRecordsResult putRecords(String projectName, String topicName, List records);

PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • records Records list to written.
  • Exception
    • DatahubClientException


1). 写入Tuple topic
2). 写入Blob topic

  1. // 写入blob型数据
  2. public static void blobExample() {
  3. // 生成十条数据
  4. List<RecordEntry> recordEntries = new ArrayList<>();
  5. String shardId = "4";
  6. for (int i = 0; i < 10; ++i) {
  7. RecordEntry recordEntry = new RecordEntry();
  8. // 对每条数据设置额外属性
  9. recordEntry.addAttribute("key1", "value1");
  10. BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
  11. recordEntry.setRecordData(data);
  12. recordEntry.setShardId(shardId);
  13. recordEntries.add(recordEntry);
  14. recordEntry.setShardId("0");
  15. }
  16. while (true) {
  17. try {
  18. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  19. //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
  20. datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
  21. System.out.println("write data successful");
  22. break;
  23. } catch (DatahubClientException e) {
  24. System.out.println(e.getErrorMessage());
  25. }
  26. }
  27. }


多方式写入


在DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中

2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗


1). 按照ShardID写入
推荐方式,使用示例如下:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setShardId("0");


2). 按HashKey写入
指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的BeginHashKey与EndHashKey决定数据写入的Shard
使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");


2). 按PartitionKey写入
指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的BeginHashKey与EndHashKey决定写入的Shard
使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setPartitionKey("TestPartitionKey");


Meter 操作


获取Meter

GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getMeter() {
  2. String shardId = "5";
  3. try {
  4. GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(Constant.projectName, Constant.topicName, shardId);
  5. System.out.println("get meter successful");
  6. System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
  7. } catch (DatahubClientException e) {
  8. System.out.println(e.getErrorMessage());
  9. }
  10. }


Subscribtion操作


订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription

CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment);

comment格式为:{“application”:”应用”,”description”:”描述”}

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • comment The comment of the subscription.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void createSubscription() {
  2. try {
  3. CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(Constant.projectName, Constant.topicName, Constant.subscribtionComment);
  4. System.out.println("create subscription successful");
  5. System.out.println(createSubscriptionResult.getSubId());
  6. } catch (DatahubClientException e) {
  7. System.out.println(e.getErrorMessage());
  8. }
  9. }


删除 Subscription

DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void deleteSubscription() {
  2. try {
  3. datahubClient.deleteSubscription(Constant.projectName, Constant.topicName, subId);
  4. System.out.println("delete subscription successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


更新 Subscription


更新已存在的Subscription,目前只支持更新comment。

UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • comment The comment you want to update.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例


列出 Subscription


listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。

ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • pageNum The page number used to list subscriptions.
    • pageSize The page size used to list subscriptions.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例代码
  • 示例代码
  1. public static void listSubscription() {
  2. try {
  3. ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(Constant.projectName, Constant.topicName, Constant.pageNum, Constant.pageSize);
  4. if (listSubscriptionResult.getSubscriptions().size() > 0) {
  5. System.out.println(listSubscriptionResult.getTotalCount());
  6. System.out.println(listSubscriptionResult.getSubscriptions().size());
  7. for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
  8. System.out.println(entry.getSubId() + "\t"
  9. + entry.getState() + "\t"
  10. + entry.getType() + "\t"
  11. + entry.getComment());
  12. }
  13. }
  14. } catch (DatahubClientException e) {
  15. System.out.println(e.getErrorMessage());
  16. }
  17. }


查询 Subscription

GetSubscriptionResult getSubscription(String projectName, String topicName, String subId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  1. public static void getSubscription() {
  2. try {
  3. GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(Constant.projectName, Constant.topicName, subId);
  4. System.out.println(getSubscriptionResult.getSubId() + "\t"
  5. + getSubscriptionResult.getState() + "\t"
  6. + getSubscriptionResult.getType() + "\t"
  7. + getSubscriptionResult.getComment());
  8. } catch (DatahubClientException e) {
  9. System.out.println(e.getErrorMessage());
  10. }
  11. }


更新 Subscription 状态


Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。

UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • state The state you want to change.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateSubscriptionState() {
  2. try {
  3. datahubClient.updateSubscriptionState(Constant.projectName, Constant.topicName, subId, SubscriptionState.ONLINE);
  4. System.out.println("update subscription state successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


offset 操作


一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化 offset


openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void openSubscriptionSession() {
  2. shardId = "4";
  3. shardIds = new ArrayList<String>();
  4. shardIds.add("0");
  5. shardIds.add("4");
  6. try {
  7. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  8. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  9. System.out.println(subscriptionOffset.getSessionId() + "\t"
  10. + subscriptionOffset.getVersionId() + "\t"
  11. + subscriptionOffset.getSequence());
  12. } catch (DatahubClientException e) {
  13. System.out.println(e.getErrorMessage());
  14. }
  15. }


获取 offset

GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds);


getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. //获取点位
  2. public static void getSubscriptionOffset() {
  3. shardId = "4";
  4. shardIds = new ArrayList<String>();
  5. shardIds.add("0");
  6. shardIds.add("4");
  7. try {
  8. GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds);
  9. SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
  10. System.out.println(subscriptionOffset.getVersionId() + "\t"
  11. + subscriptionOffset.getSequence());
  12. } catch (DatahubClientException e) {
  13. System.out.println(e.getErrorMessage());
  14. }
  15. }


提交 offset

CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);


提交点位会验证versionId和sessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的真实sequence和timestamp来填写。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • offsets The offset map of shards.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • SubscriptionOffsetResetException
    • SubscriptionSessionInvalidException
    • SubscriptionOfflineException
  • 示例
  1. //提交点位
  2. public static void commitSubscriptionOffset() {
  3. while (true) {
  4. try {
  5. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  6. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  7. //这里仅仅测试提交,完整过程请参考点位消费样例
  8. subscriptionOffset.setSequence(10);
  9. subscriptionOffset.setTimestamp(100);
  10. Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
  11. offsets.put(shardId, subscriptionOffset);
  12. // 提交点位
  13. datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
  14. } catch (DatahubClientException e) {
  15. System.out.println(e.getErrorMessage());
  16. }
  17. }
  18. }


重置 offset

ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);


重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • offsets The offset map of shards.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. //重置点位
  2. public static void resetSubscriptionOffset() throws ParseException {
  3. List<String> shardIds = Arrays.asList("0");
  4. //选择想要重置点位到的时间,并转换为时间戳
  5. String time = "2019-07-09 10:00:00";
  6. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7. Date date = simpleDateFormat.parse(time);
  8. long timestamp = date.getTime();//获取时间的时间戳
  9. long sequence = client.getCursor(Constant.projectName, Constant.topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
  10. SubscriptionOffset offset = new SubscriptionOffset();
  11. offset.setTimestamp(timestamp);
  12. offset.setSequence(sequence);
  13. Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
  14. for (String shardId : shardIds) {
  15. offsets.put(shardId, offset);
  16. }
  17. try {
  18. datahubClient.resetSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
  19. System.out.println("reset successful");
  20. } catch (DatahubClientException e) {
  21. System.out.println(e.getErrorMessage());
  22. }
  23. }


关联订阅消费DataHub数据


同读取DataHub数据类似,使用订阅进行消费的不同是订阅存储了消费的点位,用户可自由选择消费点位

  • 注意事项:
    1. 首先调用openSubscriptionSession初始化offset,获取version + session信息,全局只初始化一次,多次调用此方法,会造成原有session失效,无法提交点位
    2. 调用getcursor获取订阅的点位进行消费,消费完一条数据后,调用getNextCursor获取下一条数据点位,继续消费
    3. 提交点位时,调用commitSubscriptionOffset提交点位,commit操作会检查version和session信息,必须完全一致才能提交成功
  1. //点位消费示例,并在消费过程中进行点位的提交
  2. public static void example() {
  3. String shardId = "0";
  4. List<String> shardIds = Arrays.asList("0", "1");
  5. OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  6. SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  7. // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  8. String cursor = null;
  9. //sequence < 0说明未消费
  10. if (subscriptionOffset.getSequence() < 0) {
  11. // 获取生命周期内第一条record的cursor
  12. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  13. } else {
  14. // 获取下一条记录的Cursor
  15. long nextSequence = subscriptionOffset.getSequence() + 1;
  16. try {
  17. //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
  18. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  19. } catch (SeekOutOfRangeException e) {
  20. // 获取生命周期内第一条record的cursor
  21. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  22. }
  23. }
  24. // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  25. long recordCount = 0L;
  26. // 每次读取10条record
  27. int fetchNum = 10;
  28. while (true) {
  29. try {
  30. GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
  31. if (getRecordsResult.getRecordCount() <= 0) {
  32. // 无数据,sleep后读取
  33. Thread.sleep(1000);
  34. continue;
  35. }
  36. for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
  37. //消费数据
  38. TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
  39. System.out.println("field1:" + data.getField("field1") + "\t"
  40. + "field2:" + data.getField("field2"));
  41. // 处理数据完成后,设置点位
  42. ++recordCount;
  43. subscriptionOffset.setSequence(recordEntry.getSequence());
  44. subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
  45. if (recordCount % 1000 == 0) {
  46. //提交点位点位
  47. Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
  48. offsetMap.put(shardId, subscriptionOffset);
  49. datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
  50. System.out.println("commit offset successful");
  51. }
  52. }
  53. cursor = getRecordsResult.getNextCursor();
  54. } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
  55. // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
  56. break;
  57. } catch (OffsetResetedException e) {
  58. // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
  59. // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
  60. subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
  61. long nextSequence = subscriptionOffset.getSequence() + 1;
  62. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
  63. } catch (DatahubClientException e) {
  64. // TODO: 针对不同异常决定是否退出
  65. } catch (Exception e) {
  66. break;
  67. }
  68. }
  69. }

Connector 操作


DataHub Connector是把DataHub服务中的流式数据到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector

CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config);

CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want create.
    • columnFields Which fields you want synchronize.
    • sinkStartTime Start time to sink from datahub. Unit: Ms
    • config Detail config of specified connector type.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • ODPS示例
  1. public static void createConnector() {
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. SinkOdpsConfig config = new SinkOdpsConfig() {{
  4. setEndpoint(Constant.odps_endpoint);
  5. setProject(Constant.odps_project);
  6. setTable(Constant.odps_table);
  7. setAccessId(Constant.odps_accessId);
  8. setAccessKey(Constant.odps_accessKey);
  9. setPartitionMode(PartitionMode.SYSTEM_TIME);
  10. setTimeRange(60);
  11. }};
  12. //设置分区格式
  13. SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
  14. addConfig("ds", "%Y%m%d");
  15. addConfig("hh", "%H");
  16. addConfig("mm", "%M");
  17. }};
  18. config.setPartitionConfig(partitionConfig);
  19. try {
  20. //创建Connector
  21. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, columnFields, config);
  22. System.out.println("create connector successful");
  23. } catch (DatahubClientException e) {
  24. System.out.println(e.getErrorMessage());
  25. }
  26. }
  • OSS示例:
  1. public static void createOssConnector() {
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. SinkOssConfig ossConfig = new SinkOssConfig() {{
  4. setAccessId(Constant.oss_accessId);
  5. setAccessKey(Constant.oss_accessKey);
  6. setAuthMode(AuthMode.STS);
  7. setBucket(Constant.oss_bucket);
  8. setEndpoint(Constant.oss_endpoint);
  9. setPrefix(Constant.oss_prefix);
  10. setTimeFormat(Constant.oss_timeFormat);
  11. setTimeRange(60);
  12. }};
  13. try {
  14. //创建Connector
  15. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_OSS, columnFields, config);
  16. System.out.println("create connector successful");
  17. } catch (DatahubClientException e) {
  18. System.out.println(e.getErrorMessage());
  19. }
  20. }
  • TableStore示例:
  1. public static void createOtsConnector() {
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. final SinkOtsConfig ossConfig = new SinkOtsConfig() {{
  4. setAccessId(Constant.ots_accessId);
  5. setAccessKey(Constant.ots_accessKey);
  6. setEndpoint(Constant.ots_endpoint);
  7. setInstance(Constant.ots_instance);
  8. setTable(Constant.ots_table);
  9. setAuthMode(AuthMode.AK);
  10. }};
  11. try {
  12. //创建Connector
  13. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_OSS, columnFields, config);
  14. System.out.println("create connector successful");
  15. } catch (DatahubClientException e) {
  16. System.out.println(e.getErrorMessage());
  17. }
  18. }
  • Hologres示例:
  1. public static void createHoloConnector() {
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. final SinkHologresConfig holoConfig = new SinkHologresConfig() {{
  4. setAccessId(Constant.accessId);
  5. setAccessKey(Constant.accessKey);
  6. setProjectName(Constant.projectName);
  7. setTopicName(Constant.topicName);
  8. setAuthMode(AuthMode.AK);
  9. setInstanceId(Constant.instanceId);
  10. //设置时间格式
  11. setTimestampUnit(TimestampUnit.MILLISECOND);
  12. }};
  13. try {
  14. //创建Connector
  15. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
  16. System.out.println("create connector successful");
  17. } catch (DatahubClientException e) {
  18. System.out.println(e.getErrorMessage());
  19. }
  20. }
  • ElasticSearch 示例:
  1. public static void createEsConnector(){
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. final SinkEsConfig esConfig = new SinkEsConfig() {{
  4. setEndpoint(Constant.es_endpoint);
  5. setIdFields(Constant.es_fields);
  6. setIndex(Constant.es_index);
  7. setPassword(Constant.es_password);
  8. setProxyMode(Constant.es_proxyMode);
  9. setTypeFields(Constant.es_typeFields);
  10. setUser(Constant.es_user);
  11. }};
  12. try {
  13. //创建Connector
  14. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_OSS, columnFields, config);
  15. System.out.println("create connector successful");
  16. } catch (DatahubClientException e) {
  17. System.out.println(e.getErrorMessage());
  18. }
  19. }
  • 函数计算示例:
  1. public static void createFcConnector(){
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. final SinkFcConfig esConfig = new SinkFcConfig() {{
  4. setEndpoint(Constant.fc_endpoint);
  5. setAccessId(Constant.fc_accessId);
  6. setAccessKey(Constant.fc_accessKey);
  7. setAuthMode(AuthMode.AK);
  8. setFunction(Constant.fc_function);
  9. setService(Constant.fc_service);
  10. }};
  11. try {
  12. //创建Connector
  13. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_OSS, columnFields, config);
  14. System.out.println("create connector successful");
  15. } catch (DatahubClientException e) {
  16. System.out.println(e.getErrorMessage());
  17. }
  18. }
  • Mysql示例:
  1. public static void createMysqlConnector(){
  2. List<String> columnFields = Arrays.asList("field1", "field2");
  3. final SinkMysqlConfig esConfig = new SinkMysqlConfig() {{
  4. setDatabase( Constant.mysql_database);
  5. setHost(Constant.mysql_host);
  6. setInsertMode(InsertMode.OVERWRITE);
  7. setPassword(Constant.mysql_password);
  8. setPort(Constant.mysql_port);
  9. setTable(Constant.mysql_table);
  10. setUser(Constant.mysql_user);
  11. }};
  12. try {
  13. //创建Connector
  14. datahubClient.createConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_OSS, columnFields, config);
  15. System.out.println("create connector successful");
  16. } catch (DatahubClientException e) {
  17. System.out.println(e.getErrorMessage());
  18. }
  19. }


删除 Connector

DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want delete.
    • columnFields Which fields you want synchronize.
    • sinkStartTime Start time to sink from datahub. Unit: Ms
    • config Detail config of specified connector type.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void deleteConnector() {
  2. try {
  3. datahubClient.deleteConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println("delete connector successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


查询 Connector

GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want get.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getConnector() {
  2. try {
  3. GetConnectorResult getConnectorResult = datahubClient.getConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
  5. for (String fieldName : getConnectorResult.getColumnFields()) {
  6. System.out.println(fieldName);
  7. }
  8. } catch (DatahubClientException e) {
  9. System.out.println(e.getErrorMessage());
  10. }
  11. }


更新 Connector


更新Connector的配置。

UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want update.
    • config Detail config of specified connector type.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateConnector() {
  2. SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS).getConfig();
  3. //修改配置
  4. config.setTimeRange(100);
  5. config.setAccessId(Constant.accessId);
  6. config.setAccessKey(Constant.accessKey);
  7. try {
  8. datahubClient.updateConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, config);
  9. System.out.println("update connector successful");
  10. } catch (DatahubClientException e) {
  11. System.out.println(e.getErrorMessage());
  12. }
  13. }


更新 Connector state

UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • connectorState The state of the connector. Support: ConnectorState.PAUSED, ConnectorState.RUNNING.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateConnectorState() {
  2. try {
  3. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
  4. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
  5. System.out.println("update connector state successful");
  6. } catch (DatahubClientException e) {
  7. System.out.println(e.getErrorMessage());
  8. }
  9. }


更新 Connector offset

UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard. If shardId is null, then update all shards offset.
    • offset The connector offset.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void updateConnectorOffset() {
  2. ConnectorOffset offset = new ConnectorOffset() {{
  3. setSequence(10);
  4. setTimestamp(1000);
  5. }};
  6. try {
  7. //更新Connector点位需要先停止Connector
  8. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
  9. datahubClient.updateConnectorOffset(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId, offset);
  10. datahubClient.updateConnectorState(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
  11. System.out.println("update connector offset successful");
  12. } catch (DatahubClientException e) {
  13. System.out.println(e.getErrorMessage());
  14. }
  15. }


列出 Connector

ListConnectorResult listConnector(String projectName, String topicName);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void listConnector() {
  2. try {
  3. ListConnectorResult listConnectorResult = datahubClient.listConnector(Constant.projectName, Constant.topicName);
  4. for (String cName : listConnectorResult.getConnectorNames()) {
  5. System.out.println(cName);
  6. }
  7. } catch (DatahubClientException e) {
  8. System.out.println(e.getErrorMessage());
  9. }
  10. }


查询 Connector Shard 状态

GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType);

ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getConnectorShardStatusByShard() {
  2. try {
  3. ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId);
  4. System.out.println(connectorShardStatusEntry.getState() + "\t"
  5. + connectorShardStatusEntry.getCurrSequence() + "\t"
  6. + connectorShardStatusEntry.getDiscardCount() + "\t"
  7. + connectorShardStatusEntry.getUpdateTime());
  8. } catch (DatahubClientException e) {
  9. System.out.println(e.getErrorMessage());
  10. }
  11. }
  12. public static void getConnectorShardStatus() {
  13. try {
  14. GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  15. for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
  16. System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
  17. + entry.getValue().getCurrSequence() + "\t"
  18. + entry.getValue().getDiscardCount() + "\t"
  19. + entry.getValue().getUpdateTime());
  20. }
  21. } catch (DatahubClientException e) {
  22. System.out.println(e.getErrorMessage());
  23. }
  24. }


重启 Connector

ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType);

ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void reloadConnector() {
  2. try {
  3. datahubClient.reloadConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println("reload connector successful");
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }
  9. public static void reloadConnectorByShard() {
  10. try {
  11. datahubClient.reloadConnector(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, shardId);
  12. System.out.println("reload connector successful");
  13. } catch (DatahubClientException e) {
  14. System.out.println(e.getErrorMessage());
  15. }
  16. }


查询 Connector 完成时间

GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void getDoneTime() {
  2. try {
  3. GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS);
  4. System.out.println(getConnectorDoneTimeResult.getDoneTime());
  5. } catch (DatahubClientException e) {
  6. System.out.println(e.getErrorMessage());
  7. }
  8. }


添加新的Field

AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName);


可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • fieldName The field to append. Field value must allow null.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
  • 示例
  1. public static void appendConnectorField() {
  2. String newField = "newfield";
  3. try {
  4. //要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
  5. datahubClient.appendConnectorField(Constant.projectName, Constant.topicName, ConnectorType.SINK_ODPS, newField);
  6. } catch (DatahubClientException e) {
  7. System.out.println(e.getErrorMessage());
  8. }
  9. }

批量操作


请使用datahb console命令工具

示例代码下载

datahub java sdk示例代码