写入数据
TSDB SDK 使用 Point 类,表示一个时间点。一个 Point 对象表示一个时间序列(时间线)某个时刻上的数据。
构造时间点
Point(时间点)有多种构造方法,形式比较多样化。本节提供三种构造时间点的方法示例。
Point 的构建至少需要一个 tag 键值对。
示例一
构建一个时间点。用单位为秒的时间戳表示时间,指定 Point 数据的 Metric 与多个 Tag。
// 以'秒'为时间戳
int timestamp = (int)(System.currentTimeMillis()/1000);
// 构造 Point
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.tag("tagk2", "tagv2")
.tag("tagk3", "tagv3")
.timestamp(timestamp).value(123.456)
.build();
示例二
构建一个时间点。用单位为毫秒的时间戳表示时间,指定 Point 数据的 Metric,Tag 使用 Map 形式的键值对。
// 也可以'毫秒'为时间戳
long timestamp = System.currentTimeMillis();
// 使用 HashMap 表示 Tags
Map<String,String> tagsMap = new HashMap<String,String>();
tagsMap.put("tagk1", "tagv1");
tagsMap.put("tagk2", "tagv2");
// 构造 Point
Point point = Point.metric("test1")
.tag(tagsMap)
.value(timestamp,123.456)
.build();
示例三
构建一个时间点。使用java.util.Date
表示时间。
// 使用 java.util.Date 表示时间。
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(new Date,123.456)
.build();
写入数据
TSDB-Client 有两种写数据的方式:同步阻塞的写数据和异步非阻塞的写数据。
同步阻塞的写数据
假设我们现在需要构建 500 个时间点提交给 TSDB。
示例代码
List<Point> points = new ArrayList<Point>();
构建 Point
for(int i = 0; i<500; i++) {
long timestamp = System.currentTimeMillis();
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 手动打包数据
points.add(point);
}
// 手动打包后提交数据
tsdb.putSync(points)
出于写入性能的考虑,同步写的方式一般需要您手动将数据点打包成一批数据,并且建议这批数据包含500~1000个数据点。此外,也建议多个线程并发进行提交
同步写的响应对象
您可以根据需要选择返回的数据内容。
返回空对象,无内容。本质是调用
POST /api/put
。Result result = tsdb.putSync(ps);
返回提交概述。包含成功数和返回数。本质是调用
POST /api/put?summary=true
。SummaryResult summaryResult = tsdb.putSync(ps,SummaryResult.class);
返回提交概述。包含成功数、返回数和失败原因。本质是调用
POST /api/put?details=true
。DetailsResult detailsResult = tsdb.putSync(ps,DetailsResult.class);
异步非阻塞的写数据
异步写数据的方式比较简单,只要有 Point 就可以提交数据。Client 会自动帮助您批量地提交数据,不需要您手动打包。
如果没有特殊需求,推荐您使用异步非阻塞的写数据的方式。
示例代码
// 构建 Point
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 直接提交数据
tsdb.put(point);
异步非阻塞写数据的回调设置
当您使用异步写数据的方式时,若需要获取 Put 接口的响应结果,则需要设置回调接口。
示例代码
final AtomicLong num = new AtomicLong();
// 回调对象
BatchPutCallback cb = new BatchPutCallback() {
@Override
public void response(String address, List<Point> input, Result output) {
long afterNum = num.addAndGet(input.size());
System.out.println("成功处理" + input.size() + ",已处理" + afterNum);
}
@Override
public void failed(String address, List<Point> input, Exception ex) {
ex.printStackTrace();
long afterNum = num.addAndGet(input.size());
System.out.println("失败处理" + input.size() + ",已处理" + afterNum);
}
};
TSDBConfig config = TSDBConfig
.address("example.hitsdb.com", 8242)
.listenBatchPut(cb) // 设置回调接口
.config();
tsdb = TSDBClientFactory.connect(config);
TSDB Client 提供了以下四种类型的 Callback 接口:
BatchPutCallback,即数据写入的极简模式。对应的API调用形式为
POST /api/put
BatchPutSummaryCallback,即数据写入的统计模式。对应的API调用形式为
POST /api/put?summary=true
BatchPutDetailsCallback,即数据写入的详细模式。对应的API调用形式为
POST /api/put?details=true
BatchPutIgnoreErrorsCallback, 即数据写入的容错模式。对应的API调用形式为
POST /api/put?ignoreErrors=true
您可以通过设置不同业务场景来注册相应的回调接口。写入模式与业务场景的说明,可参见写入模式及其响应内容。
不要在回调方法中做耗时操作。若有此需要,可以再把操作交给其他工作线程执行。
回调方法
response()
与failed()
被调用的时机是不一样的:response()
: 指的是写入请求合法且被服务端处理后执行的回调。此时可以根据回调对象种类的不同,获取一些不同的反馈信息。failed()
: 指的是写入请求本身存在问题导致请求不合法(比如报文非法,触发限流,鉴权失败等等)被服务端拒绝服务时所执行的回调。
因此建议在实现Callback接口时,不要忘记实现
failed()
方法。如果不实现的话,则会执行默认的failed()
回调方法,其本身是一个空方法。以上关于回调接口的示例均使用的是单值模型写入接口的例子。对于多值模型的写入与此类似,但对应的回调接口变为下述名称:
MultiFieldbatchPutCallback
MultiFieldbatchPutSummaryCallback
MultiFieldbatchPutDetailsCallback
MultiFieldbatchPutIgnoreErrorsCallback