全部产品

读写数据

操作视频

  • 用户可以调用sdk方法进行读写数据
    状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

  • 同时用户可以引入datahub-client-library依赖,datahub-client-library是在Java-SDK读写功能的封装,用户可以使用Producer实现均匀写入shard,也可以使用Consumer实现协同消费,(建议使用)

读数据

读取数据有两种方式

  • 使用SDK读取数据
  • 使用协同消费读取数据

使用SDK(以Java SDK为例)

步骤一:获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
  • LATEST : 表示获取的cursor指向当前最新的record。
  • SEQUENCE : 表示获取的cursor指向该序列的record。
  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

    GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • CursorType Which type used to get cursor.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • SeekOutOfRangeException
  • 示例代码
    1. public static void getcursor() {
    2. String shardId = "5";
    3. try {
    4. /* OLDEST用法示例 */
    5. String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
    6. /* LATEST用法示例 */
    7. String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
    8. /* SEQUENCE用法示例 */
    9. //获取最新数据的sequence
    10. long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
    11. //获取最新的十条数据的读取位置
    12. String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
    13. /* SYSTEM_TIME用法示例 */
    14. //将时间转为时间戳形式
    15. String time = "2019-07-01 10:00:00";
    16. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    17. long timestamp = 0L;
    18. try {
    19. Date date = simpleDateFormat.parse(time);
    20. timestamp = date.getTime();//获取时间的时间戳
    21. //System.out.println(timestamp);
    22. } catch (ParseException e) {
    23. System.exit(-1);
    24. }
    25. //获取时间time之后的数据读取位置
    26. String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
    27. System.out.println("get cursor successful");
    28. } catch (InvalidParameterException e) {
    29. System.out.println("invalid parameter, please check your parameter");
    30. System.exit(1);
    31. } catch (AuthorizationFailureException e) {
    32. System.out.println("AK error, please check your accessId and accessKey");
    33. System.exit(1);
    34. } catch (ResourceNotFoundException e) {
    35. System.out.println("project or topic or shard not found");
    36. System.exit(1);
    37. } catch (SeekOutOfRangeException e) {
    38. System.out.println("offset invalid or has expired");
    39. System.exit(1);
    40. } catch (DatahubClientException e) {
    41. System.out.println("other error");
    42. System.out.println(e);
    43. System.exit(1);
    44. }
    45. }

步骤二:读取数据接口:

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • schema If you read TUPLE records, you need this parameter.
    • cursor The start cursor used to read data.
    • limit Max record size to read.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ShardSealedException
    • LimitExceededException
  • 示例

1). 读取Tuple topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (InvalidParameterException e) {
  11. System.out.println("invalid parameter, please check your parameter");
  12. System.exit(1);
  13. } catch (AuthorizationFailureException e) {
  14. System.out.println("AK error, please check your accessId and accessKey");
  15. System.exit(1);
  16. } catch (ResourceNotFoundException e) {
  17. System.out.println("project or topic or shard not found");
  18. System.exit(1);
  19. } catch (SeekOutOfRangeException e) {
  20. System.out.println("offset invalid or has expired");
  21. System.exit(1);
  22. } catch (DatahubClientException e) {
  23. System.out.println("other error");
  24. System.out.println(e);
  25. System.exit(1);
  26. }
  27. while (true) {
  28. try {
  29. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
  30. if (result.getRecordCount() <= 0) {
  31. // 无数据,sleep后读取
  32. Thread.sleep(10000);
  33. continue;
  34. }
  35. for (RecordEntry entry : result.getRecords()) {
  36. TupleRecordData data = (TupleRecordData) entry.getRecordData();
  37. System.out.println("field1:" + data.getField("field1") + "\t"
  38. + "field2:" + data.getField("field2"));
  39. }
  40. // 拿到下一个游标
  41. cursor = result.getNextCursor();
  42. } catch (InvalidCursorException ex) {
  43. // 非法游标或游标已过期,建议重新定位后开始消费
  44. cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  45. } catch (SeekOutOfRangeException e) {
  46. System.out.println("offset invalid");
  47. System.exit(1);
  48. } catch (ResourceNotFoundException e) {
  49. System.out.println("project or topic or shard not found");
  50. System.exit(1);
  51. } catch (ShardSealedException e) {
  52. System.out.println("shard is closed, all data has been read");
  53. System.exit(1);
  54. } catch (LimitExceededException e) {
  55. System.out.println("maybe exceed limit, retry");
  56. } catch (DatahubClientException e) {
  57. System.out.println("other error");
  58. System.out.println(e);
  59. System.exit(1);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }

2). 读取Blob topic数据

  1. public static void example() {
  2. //每次最多读取数据量
  3. int recordLimit = 1000;
  4. String shardId = "7";
  5. // 获取cursor, 这里获取有效数据中时间最久远的record游标
  6. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  7. String cursor = "";
  8. try {
  9. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  10. } catch (InvalidParameterException e) {
  11. System.out.println("invalid parameter, please check your parameter");
  12. System.exit(1);
  13. } catch (AuthorizationFailureException e) {
  14. System.out.println("AK error, please check your accessId and accessKey");
  15. System.exit(1);
  16. } catch (ResourceNotFoundException e) {
  17. System.out.println("project or topic or shard not found");
  18. System.exit(1);
  19. } catch (SeekOutOfRangeException e) {
  20. System.out.println("offset invalid or has expired");
  21. System.exit(1);
  22. } catch (DatahubClientException e) {
  23. System.out.println("other error");
  24. System.out.println(e);
  25. System.exit(1);
  26. }
  27. while (true) {
  28. try {
  29. GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
  30. if (result.getRecordCount() <= 0) {
  31. // 无数据,sleep后读取
  32. Thread.sleep(10000);
  33. continue;
  34. }
  35. /* 消费数据 */
  36. for (RecordEntry record: result.getRecords()){
  37. BlobRecordData data = (BlobRecordData) record.getRecordData();
  38. System.out.println(new String(data.getData()));
  39. }
  40. // 拿到下一个游标
  41. cursor = result.getNextCursor();
  42. } catch (InvalidCursorException ex) {
  43. // 非法游标或游标已过期,建议重新定位后开始消费
  44. cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
  45. } catch (SeekOutOfRangeException e) {
  46. System.out.println("offset invalid");
  47. System.exit(1);
  48. } catch (ResourceNotFoundException e) {
  49. System.out.println("project or topic or shard not found");
  50. System.exit(1);
  51. } catch (ShardSealedException e) {
  52. System.out.println("shard is closed, all data has been read");
  53. System.exit(1);
  54. } catch (LimitExceededException e) {
  55. System.out.println("maybe exceed limit, retry");
  56. } catch (DatahubClientException e) {
  57. System.out.println("other error");
  58. System.out.println(e);
  59. System.exit(1);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }

使用协同消费

步骤一:初始化Consumer

配置

名称 描述
autoCommit 是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。
offsetCommitTimeoutMs 点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]
sessionTimeoutMs 会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]
fetchSize 单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0
  1. {
  2. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  3. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  4. String accessId = "<YourAccessKeyId>";
  5. String accessKey = "<YourAccessKeySecret>";
  6. String projectName = "<YourProjectName>";
  7. String topicName = "<YourTopicName>";
  8. String subId = "<YourSubscriptionId>";
  9. /**
  10. * 1.使用协同消费,使用点位服务
  11. *使用协同消费:
  12. 如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,不需要手动指定哪台机器消费哪几个shard,
  13. 服务器端会自动分配,当有第三台机器加入了后也会做balance
  14. *
  15. * 使用点位服务:
  16. 消费的时候,会根据服务端subid的点位来读(如果是新建的订阅,还没有点位就从头读),如果要指定从哪个时间点开始读,可以在页面上重置subid的点位
  17. *
  18. * */
  19. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  20. Consumer consumer = new Consumer(projectName, topicName, subId, config);
  21. /**
  22. * 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
  23. *不使用协同消费:
  24. 如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,那需要手动指定哪台机器消费哪几个shard,
  25. (客户端机器A,消费0,1,2号shard;客户端机器B,消费3,4号shard)当有第三台机器加入了后需要自己重新指定消费策略
  26. *
  27. * */
  28. //客户端A消费shardid为0,1,2的示例
  29. List<String> shardlists = Arrays.asList("0", "1", "2");
  30. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  31. Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
  32. //客户端B消费shardid为3,4的示例
  33. // List<String> shardlists = Arrays.asList("3", "4");
  34. // ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  35. // Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);
  36. /**
  37. *3.不使用协同消费,不使用点位服务
  38. *不使用点位服务:
  39. 就是自己需要找个存储(db/redis等)来记录自己哪个shard消费到什么时间/Sequence,每次读的时候都要根据自己记录的点位来初始化
  40. *
  41. * */
  42. Map<String, Offset> offsetMap = new HashMap<>();
  43. // 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
  44. offsetMap.put("0", new Offset(100, 1548573440756L));
  45. // 只提供sequence,按照sequence获取Cursor
  46. offsetMap.put("1", new Offset().setSequence(1));
  47. // 只提供timestamp,按照timestamp获取Cursor
  48. offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
  49. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  50. Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
  51. }

步骤二:协同代码示例

  1. import com.aliyun.datahub.client.exception.AuthorizationFailureException;
  2. import com.aliyun.datahub.client.exception.DatahubClientException;
  3. import com.aliyun.datahub.client.exception.InvalidParameterException;
  4. import com.aliyun.datahub.client.exception.NoPermissionException;
  5. import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
  6. import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
  7. import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
  8. import com.aliyun.datahub.client.model.RecordEntry;
  9. import com.aliyun.datahub.client.model.TupleRecordData;
  10. import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
  11. import com.aliyun.datahub.clientlibrary.consumer.Consumer;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.util.concurrent.TimeUnit;
  15. public class DatahubReader {
  16. private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);
  17. private static void sleep(long milliSeconds) {
  18. try {
  19. TimeUnit.MILLISECONDS.sleep(milliSeconds);
  20. } catch (InterruptedException e) {
  21. // TODO:自行处理异常
  22. }
  23. }
  24. public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
  25. {
  26. return new Consumer(project, topic, subId, config);
  27. }
  28. public static void main(String[] args) {
  29. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  30. String accessId = "<YourAccessKeyId>";
  31. String accessKey = "<YourAccessKeySecret>";
  32. String projectName = "<YourProjectName>";
  33. String topicName = "<YourTopicName>";
  34. String subId = "<YourSubscriptionId>";
  35. ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
  36. Consumer consumer = createConsumer(config, projectName, topicName, subId);
  37. int maxRetry = 3;
  38. boolean stop = false;
  39. try {
  40. while (!stop) {
  41. try {
  42. while (true) {
  43. // 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
  44. // 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
  45. RecordEntry record = consumer.read(maxRetry);
  46. // 处理数据
  47. if (record != null) {
  48. TupleRecordData data = (TupleRecordData) record.getRecordData();
  49. // 根据自己的schema来处理数据,此处打印第一列的内容
  50. LOG.info("field1: {}", data.getField(0));
  51. // 根据列名取数据
  52. // LOG.info("field2: {}", data.getField("field2"));
  53. // 非自动提交模式,每条record处理完后都需要ack
  54. // 自动提交模式,ack不会做任何操作
  55. // 1.1.7版本及以上
  56. record.getKey().ack();
  57. } else {
  58. LOG.info("read null");
  59. }
  60. }
  61. } catch (SubscriptionOffsetResetException e) {
  62. // 点位被重置,重新初始化consumer
  63. try {
  64. consumer.close();
  65. consumer = createConsumer(config, projectName, topicName, subId);
  66. } catch (DatahubClientException e1) {
  67. // 初始化失败,重试或直接抛异常
  68. LOG.error("create consumer failed", e);
  69. throw e;
  70. }
  71. } catch (InvalidParameterException |
  72. SubscriptionOfflineException |
  73. SubscriptionSessionInvalidException |
  74. AuthorizationFailureException |
  75. NoPermissionException e) {
  76. // 请求参数非法
  77. // 订阅被下线
  78. // 订阅下相同shard被其他客户端占用
  79. // 签名不正确
  80. // 没有权限
  81. LOG.error("read failed", e);
  82. throw e;
  83. } catch (DatahubClientException e) {
  84. // 基类异常,包含网络问题等,可以选择重试
  85. LOG.error("read failed, retry", e);
  86. sleep(1000);
  87. }
  88. }
  89. } catch (Throwable e) {
  90. LOG.error("read failed", e);
  91. } finally {
  92. // 确保资源正确释放
  93. // 会提交已ack的点位
  94. consumer.close();
  95. }
  96. }
  97. }

写数据

写数据有两种方式

  • 使用SDK
  • 使用datahub-client-library 中的Producer,可以实现对shard的均匀写入

使用SDK

服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。
两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。
如果服务端支持,建议用户使用putRecordsByShard接口。

PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • records Records list to written.
  • Exception
    • DatahubClientException
    • InvalidParameterException
    • AuthorizationFailureException
    • ResourceNotFoundException
    • ShardSealedException
    • LimitExceededException

1). 写入Tuple topic

  1. // 写入Tuple型数据
  2. public static void tupleExample() {
  3. String shardId = "9";
  4. // 获取schema
  5. recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
  6. // 生成十条数据
  7. List<RecordEntry> recordEntries = new ArrayList<>();
  8. for (int i = 0; i < 10; ++i) {
  9. RecordEntry recordEntry = new RecordEntry();
  10. // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
  11. recordEntry.addAttribute("key1", "value1");
  12. TupleRecordData data = new TupleRecordData(recordSchema);
  13. data.setField("field1", "HelloWorld");
  14. data.setField("field2", 1234567);
  15. recordEntry.setRecordData(data);
  16. recordEntry.setShardId(shardId);
  17. recordEntries.add(recordEntry);
  18. }
  19. try {
  20. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  21. //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
  22. datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
  23. System.out.println("write data successful");
  24. } catch (InvalidParameterException e) {
  25. System.out.println("invalid parameter, please check your parameter");
  26. System.exit(1);
  27. } catch (AuthorizationFailureException e) {
  28. System.out.println("AK error, please check your accessId and accessKey");
  29. System.exit(1);
  30. } catch (ResourceNotFoundException e) {
  31. System.out.println("project or topic or shard not found");
  32. System.exit(1);
  33. } catch (ShardSealedException e) {
  34. System.out.println("shard status is CLOSED, can not write");
  35. System.exit(1);
  36. } catch (DatahubClientException e) {
  37. System.out.println("other error");
  38. System.out.println(e);
  39. System.exit(1);
  40. }
  41. }

2). 写入Blob topic

  1. // 写入blob型数据
  2. public static void blobExample() {
  3. // 生成十条数据
  4. List<RecordEntry> recordEntries = new ArrayList<>();
  5. String shardId = "4";
  6. for (int i = 0; i < 10; ++i) {
  7. RecordEntry recordEntry = new RecordEntry();
  8. // 对每条数据设置额外属性
  9. recordEntry.addAttribute("key1", "value1");
  10. BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
  11. recordEntry.setRecordData(data);
  12. recordEntry.setShardId(shardId);
  13. recordEntries.add(recordEntry);
  14. recordEntry.setShardId("0");
  15. }
  16. while (true) {
  17. try {
  18. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  19. //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
  20. datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
  21. System.out.println("write data successful");
  22. break;
  23. } catch (InvalidParameterException e) {
  24. System.out.println("invalid parameter, please check your parameter");
  25. System.exit(1);
  26. } catch (AuthorizationFailureException e) {
  27. System.out.println("AK error, please check your accessId and accessKey");
  28. System.exit(1);
  29. } catch (ResourceNotFoundException e) {
  30. System.out.println("project or topic or shard not found");
  31. System.exit(1);
  32. } catch (ShardSealedException e) {
  33. System.out.println("shard status is CLOSED, can not write");
  34. System.exit(1);
  35. } catch (LimitExceededException e) {
  36. System.out.println("maybe qps exceed limit, retry");
  37. } catch (DatahubClientException e) {
  38. System.out.println("other error");
  39. System.out.println(e);
  40. System.exit(1);
  41. }
  42. }
  43. }

使用Producer

步骤一:引入依赖

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.19.0-public</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.datahub</groupId>
  8. <artifactId>datahub-client-library</artifactId>
  9. <version>1.1.12-public</version>
  10. </dependency>

步骤二:代码示例

  1. import com.aliyun.datahub.client.exception.AuthorizationFailureException;
  2. import com.aliyun.datahub.client.exception.DatahubClientException;
  3. import com.aliyun.datahub.client.exception.InvalidParameterException;
  4. import com.aliyun.datahub.client.exception.MalformedRecordException;
  5. import com.aliyun.datahub.client.exception.NoPermissionException;
  6. import com.aliyun.datahub.client.exception.ShardNotFoundException;
  7. import com.aliyun.datahub.client.model.Field;
  8. import com.aliyun.datahub.client.model.FieldType;
  9. import com.aliyun.datahub.client.model.RecordEntry;
  10. import com.aliyun.datahub.client.model.RecordSchema;
  11. import com.aliyun.datahub.client.model.TupleRecordData;
  12. import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
  13. import com.aliyun.datahub.clientlibrary.producer.Producer;
  14. import com.aliyun.datahub.exception.ResourceNotFoundException;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.concurrent.TimeUnit;
  20. public class DatahubWriter {
  21. private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);
  22. private static void sleep(long milliSeconds) {
  23. try {
  24. TimeUnit.MILLISECONDS.sleep(milliSeconds);
  25. } catch (InterruptedException e) {
  26. // TODO:自行处理异常
  27. }
  28. }
  29. private static List<RecordEntry> genRecords(RecordSchema schema) {
  30. List<RecordEntry> recordEntries = new ArrayList<>();
  31. for (int cnt = 0; cnt < 10; ++cnt) {
  32. RecordEntry entry = new RecordEntry();
  33. entry.addAttribute("key1", "value1");
  34. entry.addAttribute("key2", "value2");
  35. TupleRecordData data = new TupleRecordData(schema);
  36. data.setField("field1", "testValue");
  37. data.setField("field2", 1);
  38. entry.setRecordData(data);
  39. recordEntries.add(entry);
  40. }
  41. return recordEntries;
  42. }
  43. private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
  44. int maxRetry = 3;
  45. while (true) {
  46. try {
  47. // 自动选择shard写入
  48. producer.send(recordEntries, maxRetry);
  49. // 指定写入shard "0"
  50. // producer.send(recordEntries, "0", maxRetry);
  51. LOG.error("send records: {}", recordEntries.size());
  52. break;
  53. } catch (MalformedRecordException e) {
  54. // record 格式非法,根据业务场景选择忽略或直接抛异常
  55. LOG.error("write fail", e);
  56. throw e;
  57. } catch (InvalidParameterException |
  58. AuthorizationFailureException |
  59. NoPermissionException e) {
  60. // 请求参数非法
  61. // 签名不正确
  62. // 没有权限
  63. LOG.error("write fail", e);
  64. throw e;
  65. } catch (ShardNotFoundException e) {
  66. // shard 不存在, 如果不是写入自己指定的shard,可以不用处理
  67. LOG.error("write fail", e);
  68. sleep(1000);
  69. } catch (ResourceNotFoundException e) {
  70. // project, topic 或 shard 不存在
  71. LOG.error("write fail", e);
  72. throw e;
  73. } catch (DatahubClientException e) {
  74. // 基类异常,包含网络问题等,可以选择重试
  75. LOG.error("write fail", e);
  76. sleep(1000);
  77. }
  78. }
  79. }
  80. public static void main(String[] args) {
  81. // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
  82. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  83. String accessId = "<YourAccessKeyId>";
  84. String accessKey = "<YourAccessKeySecret>";
  85. String projectName = "<YourProjectName>";
  86. String topicName = "<YourTopicName>";
  87. RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
  88. ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
  89. Producer producer = new Producer(projectName, topicName, config);
  90. // 根据场景控制循环
  91. boolean stop = false;
  92. try {
  93. while (!stop) {
  94. List<RecordEntry> recordEntries = genRecords(schema);
  95. sendRecords(producer, recordEntries);
  96. }
  97. } finally {
  98. // 确保资源正确释放
  99. producer.close();
  100. }
  101. }
  102. }

多方式写入

在DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中注意:开启Shard extend模式无法按Hashkey和PartitionKey方式写入

2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗

1). 按照ShardID写入
推荐方式,使用示例如下:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setShardId("0");

2). 按HashKey写入
指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的BeginHashKey与EndHashKey决定数据写入的Shard
使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

2). 按PartitionKey写入
指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的BeginHashKey与EndHashKey决定写入的Shard
使用示例:

  1. RecordEntry entry = new RecordEntry();
  2. entry.setPartitionKey("TestPartitionKey");

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。