本文通过示例代码指导您使用多值模型写入数据。详细示例请参见Aliyun TSDB SDK for Java中的test/example/SampleOfMultiField和test/TestHiTSDBClientMultiFieldFeatures。
注意:多值模型仅在0.2.0版本及以上的SDK支持。关于最新SDK版本的说明及下载,请参见SDK的《版本说明》。之前版本的SDK所支持的多值模型实际是模拟多值模型,不鼓励继续使用。之前通过 MultiValue 类写入的数据并不能通过最新的 SDK 多值模型相关类 (MultiField) 进行查询。
多值模型的数据写入
示例代码:
/*
* 通过 multiFieldPutSync() API。
* 该 API 支持单点和多点(List)写入。
*
* 下面是必须写入时必须提供的信息:
* Metric: 数据指标的类别。相当于 influxdb 的 Measurement。
* Fields: 数据指标的度量信息(相当于指标下的子类别)。即一个 metric 支持多个 fields。如 metric 为 wind,该 metric 可以有多个 fields:direction, speed, direction, description 和 temperature。
* Timestamp: 数据点的时间戳
* Tags: 时间线额外的信息,如"型号=ABC123"、"出厂编号=1234567890"等。
*/
MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
.field("speed", 45.2)
.field("level", 1.2)
.field("direction", "S")
.field("description", "Breeze")
.tag("sensor", "95D8-7913")
.tag("city", "hangzhou")
.tag("province", "zhejiang")
.timestamp(1537170208L)
.build();
// 同步写入
tsdb.multiFieldPutSync(multiFieldPoint);
多值模型的异步写入
注意:多值模型异步仅在 0.2.1 版本及以上的 SDK 支持。关于最新 SDK 版本的说明及下载,请参见SDK的《版本说明》。
不使用回调方法
默认的TSDBConfig没有设置多值异步写入回调方法。示例代码:
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
// 批次写入时每次提交数据量
.batchPutSize(500)
// 单值异步写入线程数
.batchPutConsumerThreadCount(1)
// 多值异步写入缓存队列长度
.multiFieldBatchPutBufferSize(10000)
// 多值异步写入线程数
.multiFieldBatchPutConsumerThreadCount(1)
.config();
// 特别注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
tsdb.close();
}
使用普通回调方法
普通回调方法仅报告当写入成功时的数据点列表,或者当写入失败时的数据点列表以及对应的写入异常。示例代码:
// 创建一个普通回调方法,用于处理写入成功或者失败后的情况
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {
@Override
public void response(String address, List<MultiFieldPoint> points, Result result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已处理" + count + "个点");
}
final AtomicInteger num = new AtomicInteger();
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("业务回调出错!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次写入时每次提交数据量
.batchPutSize(500)
// 单值异步写入线程数
.batchPutConsumerThreadCount(1)
// 多值异步写入缓存队列长度
.multiFieldBatchPutBufferSize(10000)
// 多值异步写入线程数
.multiFieldBatchPutConsumerThreadCount(1)
// 多值异步写入回调方法
.listenMultiFieldBatchPut(callback)
.config();
// 特别注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
tsdb.close();
}
使用摘要回调方法
摘要回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入摘要情况,包括成功点数,失败点数;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。示例代码:
// 创建一个摘要回调方法,用于处理写入成功或者失败后的情况
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已处理" + count + "个点");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("业务回调出错!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次写入时每次提交数据量
.batchPutSize(500)
// 单值异步写入线程数
.batchPutConsumerThreadCount(1)
// 多值异步写入缓存队列长度
.multiFieldBatchPutBufferSize(10000)
// 多值异步写入线程数
.multiFieldBatchPutConsumerThreadCount(1)
// 多值异步写入回调方法
.listenMultiFieldBatchPut(callback)
.config();
// 特别注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
tsdb.close();
}
使用详情回调方法
详情回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入详情,包括写入失败的点以及对应错误信息等;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。示例代码:
// 创建一个详情回调方法,用于处理写入成功或者失败后的情况
MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已处理" + count + "个点");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("业务回调出错!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次写入时每次提交数据量
.batchPutSize(500)
// 单值异步写入线程数
.batchPutConsumerThreadCount(1)
// 多值异步写入缓存队列长度
.multiFieldBatchPutBufferSize(10000)
// 多值异步写入线程数
.multiFieldBatchPutConsumerThreadCount(1)
// 多值异步写入回调方法
.listenMultiFieldBatchPut(callback)
.config();
// 特别注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
tsdb.close();
}