全部产品
云市场

快速指引

更新时间:2019-04-25 10:08:05

快速指引

本节主要描述使用DataHub Java SDK进行数据的读写。

准备工作

  • 使用DataHub服务之前,需要注册阿里云云账号,利用阿里云账号的AccessId与AccessKey接入DataHub服务。

创建Project/Topic

发布数据/订阅数据

发布/订阅数据需要使用sdk完成,以下是Java SDK 2.12基础示例 DataHub Java SDK使用说明

  1. String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
  2. String accessId = "<YourAccessKeyId>";
  3. String accessKey = "<YourAccessKeySecret>";
  4. // 创建DataHubClient实例
  5. DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
  6. .setDatahubConfig(
  7. new DatahubConfig(endpoint,
  8. // 是否开启二进制传输,服务端2.12版本开始支持
  9. new AliyunAccount(accessId, accessKey), true))
  10. //专有云使用出错尝试将参数设置为 false
  11. // HttpConfig可不设置,不设置时采用默认值
  12. .setHttpConfig(new HttpConfig().setConnTimeout(10000))
  13. .build();
  14. String projectName = "<YourProjectName>";
  15. String topicName = "<YourTopicName>";
  16. // 可通过listShard接口获取shard列表,所有ACTIVE的shard均可使用,本例使用"0"
  17. String shardId = "0";
  18. // TUPLE类型的Topic需要设置schema,也可直接通过getTopic获取
  19. RecordSchema schema = datahubClient.getTopic("test_project", "test_topic").getRecordSchema();
  20. List<RecordEntry> recordEntries = new ArrayList<~>();
  21. RecordEntry entry = new RecordEntry(schema);
  22. // 构造10条records
  23. List<RecordEntry> recordEntries = new ArrayList<>();
  24. for (int i = 0; i < 10; ++i) {
  25. RecordEntry recordEntry = new RecordEntry();
  26. // set attributes
  27. recordEntry.addAttribute("key1", "value1");
  28. // set tuple data
  29. TupleRecordData data = new TupleRecordData(schema);
  30. data.setField("field1", "HelloWorld");
  31. data.setField("field2", 1234567);
  32. recordEntry.setRecordData(data);
  33. recordEntries.add(recordEntry);
  34. }
  35. // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
  36. datahubClient.putRecordsByShard(projectName, topicName, shardId, recordEntries);
  37. // 获取cursor, 这里根据时间获取游标
  38. // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
  39. String cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME 1455869335000).getCursor();
  40. // 数据读取
  41. // 每次限读100条,最大不可超过1000
  42. int limit = 100;
  43. while (true) {
  44. try {
  45. GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, limit);
  46. // 如果有数据则处理,无数据需sleep后重新读取
  47. if (result.getRecordCount() > 0) {
  48. for (RecordEntry entry : result.getRecords()) {
  49. TupleRecordData data = (TupleRecordData) entry.getRecordData();
  50. System.out.println("field1:" + data.getField("field1"));
  51. System.out.println("field2:" + data.getField("field2"));
  52. }
  53. }
  54. // 拿到下一个游标
  55. cursor = result.getNextCursor();
  56. } catch (InvalidCursorException ex) {
  57. // 非法游标或游标已过期,建议重新定位后开始消费
  58. cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  59. }
  60. }