全部产品
云市场

SDK多值写入

更新时间:2019-05-28 10:40:12

本文通过示例代码指导您使用多值模型写入数据

注意:多值模型仅在 0.2.0 版本及以上的 SDK 支持。关于最新 SDK 版本的说明及下载,请参见 SDK 版本说明。之前版本的 SDK 所支持的多值模型实际是模拟多值模型,不鼓励继续使用。之前通过 MultiValue 类写入的数据并不能通过最新的 SDK 多值模型相关类 (MultiField) 进行查询。

多值模型的数据写入

示例代码:

  1. /*
  2. * 通过 multiFieldPutSync() API。
  3. * 该 API 支持单点和多点(List)写入。
  4. *
  5. * 下面是必须写入时必须提供的信息:
  6. * Metric: 数据指标的类别。相当于 influxdb 的 Measurement。
  7. * Fields: 数据指标的度量信息(相当于指标下的子类别)。即一个 metric 支持多个 fields。如 metric 为 wind,该 metric 可以有多个 fields:direction, speed, direction, description 和 temperature。
  8. * Timestamp: 数据点的时间戳
  9. * Tags: 时间线额外的信息,如"型号=ABC123"、"出厂编号=1234567890"等。
  10. */
  11. MultiFieldPoint point1 = MultiFieldPoint.metric("wind").timestamp(1346846400L)
  12. .fields("speed", 20.0)
  13. .fields("level", 5)
  14. .fields("direction", "NE")
  15. .fields("description", "Breeze")
  16. .fields("temperature", "18.9")
  17. .tag("sensor", "IOTE_1988_0001")
  18. .tag("district", "Yuhang")
  19. .tag("city", "Hangzhou")
  20. .tag("province", "Zhejiang")
  21. .build();
  22. // 同步写入
  23. tsdb.multiFieldPutSync(multiValuedPoint);
  24. // 同步写入并且获取写入成功失败总结
  25. tsdb.multiFieldPutSync(multiValuedPoint, SummaryResult.class);
  26. // 同步写入并且获取详细的写入成功或者失败信息
  27. tsdb.multiFieldPutSync(multiValuedPoint, DetailsResult.class);

多值模型的异步写入

注意:多值模型异步仅在 0.2.1 版本及以上的 SDK 支持。关于最新 SDK 版本的说明及下载,请参见 SDK 版本说明

不使用回调方法

默认的TSDBConfig没有设置多值异步写入回调方法。示例代码:

  1. TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
  2. // 批次写入时每次提交数据量
  3. .batchPutSize(500)
  4. // 单值异步写入线程数
  5. .batchPutConsumerThreadCount(1)
  6. // 多值异步写入缓存队列长度
  7. .multiFieldBatchPutBufferSize(10000)
  8. // 多值异步写入线程数
  9. .multiFieldBatchPutConsumerThreadCount(1)
  10. .config();
  11. // 特别注意,TSDB只需初始化一次即可
  12. TSDB tsdb = TSDBClientFactory.connect(config);
  13. MultiFieldPoint point = MultiFieldPoint
  14. .metric("test-test-test")
  15. .tag("a", "1")
  16. .tag("b", "2")
  17. .timestamp(System.currentTimeMillis())
  18. .field("f1", Math.random())
  19. .field("f2", Math.random())
  20. .build();
  21. tsdb.multiFieldPut(point);
  22. // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
  23. tsdb.close();
  24. }

使用普通回调方法

普通回调方法仅报告当写入成功时的数据点列表,或者当写入失败时的数据点列表以及对应的写入异常。

示例代码:

  1. // 创建一个普通回调方法,用于处理写入成功或者失败后的情况
  2. AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {
  3. @Override
  4. public void response(String address, List<MultiFieldPoint> points, Result result) {
  5. int count = num.addAndGet(points.size());
  6. System.out.println(result);
  7. System.out.println("已处理" + count + "个点");
  8. }
  9. final AtomicInteger num = new AtomicInteger();
  10. @Override
  11. public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
  12. System.err.println("业务回调出错!" + points.size() + " error!");
  13. ex.printStackTrace();
  14. }
  15. };
  16. TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
  17. .httpConnectTimeout(90)
  18. // 批次写入时每次提交数据量
  19. .batchPutSize(500)
  20. // 单值异步写入线程数
  21. .batchPutConsumerThreadCount(1)
  22. // 多值异步写入缓存队列长度
  23. .multiFieldBatchPutBufferSize(10000)
  24. // 多值异步写入线程数
  25. .multiFieldBatchPutConsumerThreadCount(1)
  26. // 多值异步写入回调方法
  27. .listenMultiFieldBatchPut(callback)
  28. .config();
  29. // 特别注意,TSDB只需初始化一次即可
  30. TSDB tsdb = TSDBClientFactory.connect(config);
  31. MultiFieldPoint point = MultiFieldPoint
  32. .metric("test-test-test")
  33. .tag("a", "1")
  34. .tag("b", "2")
  35. .timestamp(System.currentTimeMillis())
  36. .field("f1", Math.random())
  37. .field("f2", Math.random())
  38. .build();
  39. tsdb.multiFieldPut(point);
  40. // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
  41. tsdb.close();
  42. }

使用摘要回调方法

摘要回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入摘要情况,包括成功点数,失败点数;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。

示例代码:

  1. // 创建一个摘要回调方法,用于处理写入成功或者失败后的情况
  2. AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {
  3. final AtomicInteger num = new AtomicInteger();
  4. @Override
  5. public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
  6. int count = num.addAndGet(points.size());
  7. System.out.println(result);
  8. System.out.println("已处理" + count + "个点");
  9. }
  10. @Override
  11. public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
  12. System.err.println("业务回调出错!" + points.size() + " error!");
  13. ex.printStackTrace();
  14. }
  15. };
  16. TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
  17. .httpConnectTimeout(90)
  18. // 批次写入时每次提交数据量
  19. .batchPutSize(500)
  20. // 单值异步写入线程数
  21. .batchPutConsumerThreadCount(1)
  22. // 多值异步写入缓存队列长度
  23. .multiFieldBatchPutBufferSize(10000)
  24. // 多值异步写入线程数
  25. .multiFieldBatchPutConsumerThreadCount(1)
  26. // 多值异步写入回调方法
  27. .listenMultiFieldBatchPut(callback)
  28. .config();
  29. // 特别注意,TSDB只需初始化一次即可
  30. TSDB tsdb = TSDBClientFactory.connect(config);
  31. MultiFieldPoint point = MultiFieldPoint
  32. .metric("test-test-test")
  33. .tag("a", "1")
  34. .tag("b", "2")
  35. .timestamp(System.currentTimeMillis())
  36. .field("f1", Math.random())
  37. .field("f2", Math.random())
  38. .build();
  39. tsdb.multiFieldPut(point);
  40. // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
  41. tsdb.close();
  42. }

使用详情回调方法

详情回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入详情,包括写入失败的点以及对应错误信息等;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。示例代码:

  1. // 创建一个详情回调方法,用于处理写入成功或者失败后的情况
  2. MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {
  3. final AtomicInteger num = new AtomicInteger();
  4. @Override
  5. public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
  6. int count = num.addAndGet(points.size());
  7. System.out.println(result);
  8. System.out.println("已处理" + count + "个点");
  9. }
  10. @Override
  11. public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
  12. System.err.println("业务回调出错!" + points.size() + " error!");
  13. ex.printStackTrace();
  14. }
  15. };
  16. TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
  17. .httpConnectTimeout(90)
  18. // 批次写入时每次提交数据量
  19. .batchPutSize(500)
  20. // 单值异步写入线程数
  21. .batchPutConsumerThreadCount(1)
  22. // 多值异步写入缓存队列长度
  23. .multiFieldBatchPutBufferSize(10000)
  24. // 多值异步写入线程数
  25. .multiFieldBatchPutConsumerThreadCount(1)
  26. // 多值异步写入回调方法
  27. .listenMultiFieldBatchPut(callback)
  28. .config();
  29. // 特别注意,TSDB只需初始化一次即可
  30. TSDB tsdb = TSDBClientFactory.connect(config);
  31. MultiFieldPoint point = MultiFieldPoint
  32. .metric("test-test-test")
  33. .tag("a", "1")
  34. .tag("b", "2")
  35. .timestamp(System.currentTimeMillis())
  36. .field("f1", Math.random())
  37. .field("f2", Math.random())
  38. .build();
  39. tsdb.multiFieldPut(point);
  40. // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
  41. tsdb.close();
  42. }