TimeStream是阿里云Elasticsearch团队自研,并结合Elastic社区时序类产品特性共建的时序引擎。阿里云Elasticsearch提供Aliyun-TimeStream时序增强功能插件,支持通过API接口完成TimeStream索引的增删改查,以及数据写入和查询。本文介绍如何使用TimeStream管理Elasticsearch时序数据。
背景信息
阿里云Elasticsearch提供的TimeStream时序增强功能插件,优化了Elasticsearch在存储指标数据方面的DSL(Domain-Specific Language)查询复杂且慢以及存储成本过高等问题,详细信息请参见TimeStream时序增强引擎介绍。本文主要介绍如何使用TimeStream,涉及到的API可通过查询API文档获得完整信息,详细信息请参见TimeStream API功能介绍和TimeStream集成Prometheus接口。
前提条件
已创建阿里云Elasticsearch实例,且实例版本为通用商业版7.16及以上、内核版本为1.7.0及以上,或者实例版本为通用商业版7.10、内核版本为1.8.0及以上。具体操作请参见创建阿里云Elasticsearch实例。
管理时序索引
创建时序索引
PUT _time_stream/test_stream
与直接通过Elasticsearch create index命令(PUT test_stream
)创建的索引相比,通过TimeStream的创建索引接口创建的索引是DataStream索引,而不是一个具体的索引,并且自动集成了Elasticsearch在时序场景的最佳实践配置。
GET _time_stream/test_stream
{
"time_streams" : {
"test_stream" : {
"name" : "test_stream",
"data_stream_name" : "test_stream",
"time_stream_config" : {
"labels_fields" : {
"includes" : [
"labels.*"
],
"excludes" : [ ]
},
"metrics_fields" : {
"includes" : [
"metrics.*"
],
"excludes" : [ ]
},
"label_prefix" : "labels.",
"metric_prefix" : "metrics.",
"downsample" : [ ]
},
"template_name" : ".timestream_test_stream",
"template" : {
"index_patterns" : [
"test_stream"
],
"template" : {
"settings" : {
"index" : {
"mode" : "time_series",
"codec" : "ali",
"refresh_interval" : "10s",
"ali_codec_service" : {
"enabled" : "true",
"source_reuse_doc_values" : {
"enabled" : "true"
}
},
"translog" : {
"durability" : "ASYNC"
},
"doc_value" : {
"compression" : {
"default" : "zstd"
}
},
"postings" : {
"compression" : "zstd"
},
"source" : {
"compression" : "zstd"
},
"routing_path" : [
"labels.*"
]
}
},
"mappings" : {
"numeric_detection" : true,
"dynamic_templates" : [
{
"labels_template_match_labels.*" : {
"path_match" : "labels.*",
"mapping" : {
"time_series_dimension" : "true",
"type" : "keyword"
},
"match_mapping_type" : "*"
}
},
{
"metrics_double_match_metrics.*" : {
"path_match" : "metrics.*",
"mapping" : {
"index" : "false",
"type" : "double"
},
"match_mapping_type" : "double"
}
},
{
"metrics_long_match_metrics.*" : {
"path_match" : "metrics.*",
"mapping" : {
"index" : "false",
"type" : "long"
},
"match_mapping_type" : "long"
}
}
],
"properties" : {
"@timestamp" : {
"format" : "epoch_millis||strict_date_optional_time",
"type" : "date"
}
}
}
},
"composed_of" : [ ],
"data_stream" : {
"hidden" : false
}
},
"version" : 1
}
}
}
参数 | 说明 |
index.mode | 取值time_series,表示创建的索引类型是time_series索引,系统会自动集成Elasticsearch在时序场景的最佳实践配置。 |
index.codec | 取值ali,表示使用aliyun-codec索引压缩插件。与以下参数配合使用,可以极大减少磁盘存储空间:
|
- 维度字段:默认使用keyword类型,然后配置time_series_dimension=true,标识为维度字段。index.mode=time_series会把所有time_series_dimension=true的字段拼装成一个时间线id(_tsid)的内部字段。
- 指标字段:支持double和long类型,只存储doc_values,不存储索引。
- 自定义索引的shard数量
PUT _time_stream/test_stream { "template": { "settings": { "index": { "number_of_shards": "2" } } } }
- 自定义索引的数据模型
PUT _time_stream/test_stream { "template": { "settings": { "index": { "number_of_shards": "2" } } }, "time_stream": { "labels_fields": ["labels_*"], "metrics_fields": ["metrics_*"] }
更新时序索引
POST _time_stream/test_stream/_update
{
"template": {
"settings": {
"index": {
"number_of_shards": "4"
}
}
}
}
- 更新内容会全量覆盖索引的配置,因此在执行更新命令时,需要保留无需更新的配置。建议通过
GET _time_stream/test_stream
命令获取索引的全量配置信息,再在其基础上进行修改。 - 更新时序索引配置后,新配置不会立即生效,需要等到索引进行一次rollover,生成新索引后,在新索引上生效。您可以通过
POST test_stream/_rollover
命令手动执行rollover。
删除时序索引
Delete _time_stream/test_stream
使用时序索引
时序索引的使用方式与普通索引一致,具体说明如下。
写入时序数据
POST test_stream/_doc
{
"@timestamp": 1630465208722,
"metrics": {
"cpu.idle": 79.67298116109929,
"disk_ioutil": 17.630910821570456,
"mem.free": 75.79973639970004
},
"labels": {
"disk_type": "disk_type2",
"namespace": "namespaces1",
"clusterId": "clusterId3",
"nodeId": "nodeId5"
}
}
写入数据时,DataStream会根据@timestamp的值来决定写到哪个索引,所以上述用例中@timestamp要设置在当前test_stream索引的时间区间上。
时间区间中的时间为UTC时间格式,例如2022-06-21T00:00:00.000Z。如果您所在时区为东8区(北京时间),需要在UTC时间上加8小时转换为北京时间,转换后对应的时间为2022-06-21T00:00:00.000+08:00,即2022-06-21T08:00:00.000。
查询时序数据
GET test_stream/_search
GET _cat/indices/test_stream?v&s=i
查询时序索引指标
GET _time_stream/test_stream/_stats
{
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"time_stream_count" : 1,
"indices_count" : 1,
"total_store_size_bytes" : 19132,
"time_streams" : [
{
"time_stream" : "test_stream",
"indices_count" : 1,
"store_size_bytes" : 19132,
"tsid_count" : 2
}
]
}
使用Prometheus接口查询数据
- 在Grafana控制台中配置通过在Grafana控制台中配置Prometheus数据源,并在URL中指定
/_time_stream/prom/test_stream
URI,直接将TimeStream索引作为Grafana的Prometheus数据源使用,如下图所示。 - 使用Prometheus API配置
使用Prometheus API,去掉返回的指标和维度字段的关键字前后缀。例如,使用默认的数据模型,指标前缀metric.会被去掉,维度前缀label.会被去掉。
如果自定义了数据模型,则需要额外配置前后缀关键字,否则Prometheus API返回的就是实际的数据。配置前后缀关键字的示例如下。PUT _time_stream/{name} { "time_stream": { "labels_fields": "@labels.*_l", "metrics_fields": "@metrics.*_m", "label_prefix": "@labels.", "label_suffix": "_l", "metric_prefix": "@metrics.", "metric_suffix": "_m" } }
元数据查询
- 查看test_stream索引中的所有指标。
GET /_time_stream/prom/test_stream/metadata
预期结果如下。{ "status" : "success", "data" : { "cpu.idle" : [ { "type" : "gauge", "help" : "", "unit" : "" } ], "disk_ioutil" : [ { "type" : "gauge", "help" : "", "unit" : "" } ], "mem.free" : [ { "type" : "gauge", "help" : "", "unit" : "" } ] } }
- 查看test_stream索引中的所有维度。
GET /_time_stream/prom/test_stream/labels
预期结果如下。{ "status" : "success", "data" : [ "__name__", "clusterId", "disk_type", "namespace", "nodeId" ] }
- 查看test_stream索引中,某个具体维度的全部value。
GET /_time_stream/prom/test_stream/label/clusterId/values
预期结果如下。{ "status" : "success", "data" : [ "clusterId1", "clusterId3" ] }
- 查看test_stream索引中,cpu.idle指标全部的时间线。
GET /_time_stream/prom/test_stream/series?match[]=cpu.idle
预期结果如下。{ "status" : "success", "data" : [ { "__name__" : "cpu.idle", "disk_type" : "disk_type1", "namespace" : "namespaces2", "clusterId" : "clusterId1", "nodeId" : "nodeId2" }, { "__name__" : "cpu.idle", "disk_type" : "disk_type1", "namespace" : "namespaces2", "clusterId" : "clusterId1", "nodeId" : "nodeId5" }, { "__name__" : "cpu.idle", "disk_type" : "disk_type2", "namespace" : "namespaces1", "clusterId" : "clusterId3", "nodeId" : "nodeId5" } ] }
数据查询
- 通过Prometheus instant query接口查询数据
GET /_time_stream/prom/test_stream/query?query=cpu.idle&time=1655769837
说明 time的单位为秒,不传递时,默认查询最新5分钟内的数据。返回结果为Prometheus query接口的数据格式,如下所示。{ "status" : "success", "data" : { "resultType" : "vector", "result" : [ { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId1", "disk_type" : "disk_type1", "namespace" : "namespaces2", "nodeId" : "nodeId2" }, "value" : [ 1655769837, "79.672981161" ] }, { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId1", "disk_type" : "disk_type1", "namespace" : "namespaces2", "nodeId" : "nodeId5" }, "value" : [ 1655769837, "79.672981161" ] }, { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId3", "disk_type" : "disk_type2", "namespace" : "namespaces1", "nodeId" : "nodeId5" }, "value" : [ 1655769837, "79.672981161" ] } ] } }
- 通过Prometheus range query接口查询数据
GET /_time_stream/prom/test_stream/query_range?query=cpu.idle&start=1655769800&end=16557699860&step=1m
预期结果如下。{ "status" : "success", "data" : { "resultType" : "matrix", "result" : [ { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId1", "disk_type" : "disk_type1", "namespace" : "namespaces2", "nodeId" : "nodeId2" }, "value" : [ [ 1655769860, "79.672981161" ] ] }, { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId1", "disk_type" : "disk_type1", "namespace" : "namespaces2", "nodeId" : "nodeId5" }, "value" : [ [ 1655769860, "79.672981161" ] ] }, { "metric" : { "__name__" : "cpu.idle", "clusterId" : "clusterId3", "disk_type" : "disk_type2", "namespace" : "namespaces1", "nodeId" : "nodeId5" }, "value" : [ [ 1655769860, "79.672981161" ] ] } ] } }
使用DownSample功能
PUT _time_stream/test_stream
{
"time_stream": {
"downsample": [
{
"interval": "1m"
},
{
"interval": "10m"
},
{
"interval": "60m"
}
]
}
}
- DownSample操作是对一个原始索引执行DownSample,生成DownSample索引。DownSample操作是在索引rollover后产生了一个新索引,然后旧索引过了一段时间,不再写入数据时进行的。目前默认是当前时间比旧索引的end_time大两小时才开始进行DownSample。为了模拟这个效果,创建索引时可以手动指定start_time和end_time。重要 最新索引的end_time会被Elasticsearch修改为最新时间,影响DownSample演示,默认是5分钟修改一次。DownSample演示操作要确保end_time不被修改,end_time值可通过
GET {index}/_settings
命令查看。PUT _time_stream/test_stream { "template": { "settings": { "index.time_series.start_time": "2022-06-20T00:00:00.000Z", "index.time_series.end_time": "2022-06-21T00:00:00.000Z" } }, "time_stream": { "downsample": [ { "interval": "1m" }, { "interval": "10m" }, { "interval": "60m" } ] } }
- 设置索引的end_time比当前时间至少小两个小时,然后写入一些数据(需要调整@timestamp为start_time和end_time之间的时间)。
POST test_stream/_doc { "@timestamp": 1655706106000, "metrics": { "cpu.idle": 79.67298116109929, "disk_ioutil": 17.630910821570456, "mem.free": 75.79973639970004 }, "labels": { "disk_type": "disk_type2", "namespace": "namespaces1", "clusterId": "clusterId3", "nodeId": "nodeId5" } }
- 写入一些数据后,需要更新TimeStream索引,去掉start_time和end_time。
POST _time_stream/test_stream/_update { "time_stream": { "downsample": [ { "interval": "1m" }, { "interval": "10m" }, { "interval": "60m" } ] } }
- 对索引执行rollover命令。
POST test_stream/_rollover
- rollover完成后,通过
GET _cat/indices/test_stream?v&s=i
命令,查看test_stream生成的DownSample索引。预期结果如下。health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open .ds-test_stream-2022.06.21-000001 vhEwKIlwSGO3ax4RKn**** 1 1 9 0 18.5kb 12.1kb green open .ds-test_stream-2022.06.21-000001_interval_10m r9Tsj0v-SyWJDc64oC**** 1 1 1 0 15.8kb 7.9kb green open .ds-test_stream-2022.06.21-000001_interval_1h cKsAlMK-T2-luefNAF**** 1 1 1 0 15.8kb 7.9kb green open .ds-test_stream-2022.06.21-000001_interval_1m L6ocasDFTz-c89KjND**** 1 1 1 0 15.8kb 7.9kb green open .ds-test_stream-2022.06.21-000002 42vlHEFFQrmMAdNdCz**** 1 1 0 0 452b 226b
GET test_stream/_search?size=0&request_cache=false
{
"aggs": {
"1": {
"terms": {
"field": "labels.disk_type",
"size": 10
},
"aggs": {
"2": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "120m"
}
}
}
}
}
}
{
"took" : 15,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"1" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "disk_type2",
"doc_count" : 9,
"2" : {
"buckets" : [
{
"key_as_string" : "2022-06-20T06:00:00.000Z",
"key" : 1655704800000,
"doc_count" : 9
}
]
}
}
]
}
}
}
根据hits.total.value=1可以看到,只命中了一条记录。aggs结果的doc_count=9,表示实际索引数据量是9,因此可以看到用户查询的不是原始索引,而是DownSample索引。
如果将fixed_interval改为20s,那么可以看到结果的hits.total.value=9,与aggs结果的doc_count结果一致,说明查询到了原始索引。
由此可见,这些DownSample索引跟原始索引的settings和mappings是一致的,只是数据是按时间范围做了降采样。