TimeStream管理Elasticsearch时序数据快速入门

TimeStream是阿里云Elasticsearch团队自研,并结合Elastic社区时序类产品特性共建的时序引擎。阿里云Elasticsearch提供Aliyun-TimeStream时序增强功能插件,支持通过API接口完成TimeStream索引的增删改查,以及数据写入和查询。本文介绍如何使用TimeStream管理Elasticsearch时序数据。

背景信息

阿里云Elasticsearch提供的TimeStream时序增强功能插件,优化了Elasticsearch在存储指标数据方面的DSL(Domain-Specific Language)查询复杂且慢以及存储成本过高等问题,详细信息请参见TimeStream时序增强引擎介绍

本文主要介绍如何使用TimeStream,涉及到的API可通过查询API文档获得完整信息,详细信息请参见TimeStream API功能介绍TimeStream集成Prometheus接口

前提条件

已创建阿里云Elasticsearch实例,且实例版本为通用商业版7.16及以上、内核版本为1.7.0及以上,或者实例版本为通用商业版7.10、内核版本为1.8.0及以上。具体操作请参见创建阿里云Elasticsearch实例

管理时序索引

创建时序索引

使用如下命令,通过TimeStream的创建索引接口创建一个名称为test_stream的时序索引。
PUT _time_stream/test_stream

与直接通过Elasticsearch create index命令(PUT test_stream)创建的索引相比,通过TimeStream的创建索引接口创建的索引是DataStream索引,而不是一个具体的索引,并且自动集成了Elasticsearch在时序场景的最佳实践配置。

您可以通过如下命令,查看test_stream索引的具体配置。
GET _time_stream/test_stream
预期返回结果如下。
{
  "time_streams" : {
    "test_stream" : {
      "name" : "test_stream",
      "data_stream_name" : "test_stream",
      "time_stream_config" : {
        "labels_fields" : {
          "includes" : [
            "labels.*"
          ],
          "excludes" : [ ]
        },
        "metrics_fields" : {
          "includes" : [
            "metrics.*"
          ],
          "excludes" : [ ]
        },
        "label_prefix" : "labels.",
        "metric_prefix" : "metrics.",
        "downsample" : [ ]
      },
      "template_name" : ".timestream_test_stream",
      "template" : {
        "index_patterns" : [
          "test_stream"
        ],
        "template" : {
          "settings" : {
            "index" : {
              "mode" : "time_series",
              "codec" : "ali",
              "refresh_interval" : "10s",
              "ali_codec_service" : {
                "enabled" : "true",
                "source_reuse_doc_values" : {
                  "enabled" : "true"
                }
              },
              "translog" : {
                "durability" : "ASYNC"
              },
              "doc_value" : {
                "compression" : {
                  "default" : "zstd"
                }
              },
              "postings" : {
                "compression" : "zstd"
              },
              "source" : {
                "compression" : "zstd"
              },
              "routing_path" : [
                "labels.*"
              ]
            }
          },
          "mappings" : {
            "numeric_detection" : true,
            "dynamic_templates" : [
              {
                "labels_template_match_labels.*" : {
                  "path_match" : "labels.*",
                  "mapping" : {
                    "time_series_dimension" : "true",
                    "type" : "keyword"
                  },
                  "match_mapping_type" : "*"
                }
              },
              {
                "metrics_double_match_metrics.*" : {
                  "path_match" : "metrics.*",
                  "mapping" : {
                    "index" : "false",
                    "type" : "double"
                  },
                  "match_mapping_type" : "double"
                }
              },
              {
                "metrics_long_match_metrics.*" : {
                  "path_match" : "metrics.*",
                  "mapping" : {
                    "index" : "false",
                    "type" : "long"
                  },
                  "match_mapping_type" : "long"
                }
              }
            ],
            "properties" : {
              "@timestamp" : {
                "format" : "epoch_millis||strict_date_optional_time",
                "type" : "date"
              }
            }
          }
        },
        "composed_of" : [ ],
        "data_stream" : {
          "hidden" : false
        }
      },
      "version" : 1
    }
  }
}
从返回结果可以看到,TimeStream的创建索引接口默认会创建一个.timestream_test_streamindex_template,index_template settings中配置了以下关键参数。
参数说明
index.mode取值time_series,表示创建的索引类型是time_series索引,系统会自动集成Elasticsearch在时序场景的最佳实践配置。
index.codec取值ali,表示使用aliyun-codec索引压缩插件。与以下参数配合使用,可以极大减少磁盘存储空间:
  • index.ali_codec_service.enabled=true:开启codec压缩功能。
  • index.doc_value.compression.default=zstd:doc_values使用zstd压缩。
  • index.postings.compression=zstd:倒排数据使用zstd压缩。
  • index.ali_codec_service.source_reuse_doc_values.enabled=true:不存储source,使用doc_values拼装source。
  • index.source.compression=zstd:正排数据使用zstd压缩。
index_template mappings配置了时序模型对应的dynamic_templates配置:
  • 维度字段:默认使用keyword类型,然后配置time_series_dimension=true,标识为维度字段。index.mode=time_series会把所有time_series_dimension=true的字段拼装成一个时间线id(_tsid)的内部字段。
  • 指标字段:支持doublelong类型,只存储doc_values,不存储索引。
如果您需要自定义索引的模板信息,可直接使用index_template语法,示例如下:
  • 自定义索引的shard数量
    PUT _time_stream/test_stream
    {
      "template": {
        "settings": {
          "index": {
            "number_of_shards": "2"
          }
        }
      }
    }
                            
  • 自定义索引的数据模型
    PUT _time_stream/test_stream
    {
      "template": {
        "settings": {
          "index": {
            "number_of_shards": "2"
          }
        }
      },
      "time_stream": {
        "labels_fields": ["labels_*"],
        "metrics_fields": ["metrics_*"]
    }

更新时序索引

使用如下命令,更新test_stream时序索引的shard数量。
POST _time_stream/test_stream/_update
{
  "template": {
    "settings": {
      "index": {
        "number_of_shards": "4"
      }
    }
  }
}
重要
  • 更新内容会全量覆盖索引的配置,因此在执行更新命令时,需要保留无需更新的配置。建议通过GET _time_stream/test_stream命令获取索引的全量配置信息,再在其基础上进行修改。
  • 更新时序索引配置后,新配置不会立即生效,需要等到索引进行一次rollover,生成新索引后,在新索引上生效。您可以通过POST test_stream/_rollover命令手动执行rollover。

删除时序索引

使用如下命令,删除test_stream时序索引。
Delete _time_stream/test_stream
说明 删除接口会直接删除目标时序索引全部的数据和相关配置。

使用时序索引

时序索引的使用方式与普通索引一致,具体说明如下。

写入时序数据

使用bulkindex API,按照时序模型写入数据,示例如下(@timestamp需调整为当前时间)。
POST test_stream/_doc
{
  "@timestamp": 1630465208722,
  "metrics": {
    "cpu.idle": 79.67298116109929,
    "disk_ioutil": 17.630910821570456,
    "mem.free": 75.79973639970004
  },
  "labels": {
    "disk_type": "disk_type2",
    "namespace": "namespaces1",
    "clusterId": "clusterId3",
    "nodeId": "nodeId5"
  }
}
时序索引的时间范围是TimeStream使用DataStream功能在内部生成的,原理为:DataStream的每个索引都配置了index.time_series.start_timeindex.time_series.end_time两个settings配置,索引只允许@timestamp在[start_time,end_time)区间范围内的数据写入,是一个前闭后开区间。这个是DataStream内部为索引生成的,无需用户配置。索引一旦创建,index.time_series.start_time就固定不变了,而index.time_series.end_time会不断扩大,直到rollover生成一个新索引,此时旧索引的index.time_series.end_time就固定不变了,新索引的index.time_series.start_time就是旧索引的index.time_series.end_time,这样DataStream内部的索引就是时间范围连续的多个索引。原理图

写入数据时,DataStream会根据@timestamp的值来决定写到哪个索引,所以上述用例中@timestamp要设置在当前test_stream索引的时间区间上。

时间区间中的时间为UTC时间格式,例如2022-06-21T00:00:00.000Z。如果您所在时区为东8区(北京时间),需要在UTC时间上加8小时转换为北京时间,转换后对应的时间为2022-06-21T00:00:00.000+08:00,即2022-06-21T08:00:00.000。

查询时序数据

使用search接口查询数据。
GET test_stream/_search
使用_cat/indices接口,查看索引的详细信息。
GET _cat/indices/test_stream?v&s=i

查询时序索引指标

使用TimeStream的_stats接口,查看索引stats信息。
GET _time_stream/test_stream/_stats
预期结果如下。
{
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "time_stream_count" : 1,
  "indices_count" : 1,
  "total_store_size_bytes" : 19132,
  "time_streams" : [
    {
      "time_stream" : "test_stream",
      "indices_count" : 1,
      "store_size_bytes" : 19132,
      "tsid_count" : 2
    }
  ]
}
说明 由于shard上获取时间线数量是从内部的时间线id(_tsid)字段的doc_values数据直接获取,查询代价过高,所以增加了缓存策略。只读索引获取一次后就不再获取,正常索引缓存默认失效时间是5分钟,通过设置索引配置index.time_series.stats.refresh_interval可修改缓存更新时间,最小为1分钟。

使用Prometheus接口查询数据

在使用Prometheus接口查询数据前,您需要配置Prometheus数据源,支持以下两种方式:
  • Grafana控制台中配置
    通过在Grafana控制台中配置Prometheus数据源,并在URL中指定/_time_stream/prom/test_stream URI,直接将TimeStream索引作为GrafanaPrometheus数据源使用,如下图所示。配置Grafana数据源
  • 使用Prometheus API配置

    使用Prometheus API,去掉返回的指标和维度字段的关键字前后缀。例如,使用默认的数据模型,指标前缀metric.会被去掉,维度前缀label.会被去掉。

    如果自定义了数据模型,则需要额外配置前后缀关键字,否则Prometheus API返回的就是实际的数据。配置前后缀关键字的示例如下。
    PUT _time_stream/{name}
    {
      "time_stream": {
        "labels_fields": "@labels.*_l",
        "metrics_fields": "@metrics.*_m",
        "label_prefix": "@labels.",
        "label_suffix": "_l",
        "metric_prefix": "@metrics.",
        "metric_suffix": "_m"
      }
    }

元数据查询

  • 查看test_stream索引中的所有指标。
    GET /_time_stream/prom/test_stream/metadata
    预期结果如下。
    {
      "status" : "success",
      "data" : {
        "cpu.idle" : [
          {
            "type" : "gauge",
            "help" : "",
            "unit" : ""
          }
        ],
        "disk_ioutil" : [
          {
            "type" : "gauge",
            "help" : "",
            "unit" : ""
          }
        ],
        "mem.free" : [
          {
            "type" : "gauge",
            "help" : "",
            "unit" : ""
          }
        ]
      }
    }
  • 查看test_stream索引中的所有维度。
    GET /_time_stream/prom/test_stream/labels
    预期结果如下。
    {
      "status" : "success",
      "data" : [
        "__name__",
        "clusterId",
        "disk_type",
        "namespace",
        "nodeId"
      ]
    }
  • 查看test_stream索引中,某个具体维度的全部value。
    GET /_time_stream/prom/test_stream/label/clusterId/values
    预期结果如下。
    {
      "status" : "success",
      "data" : [
        "clusterId1",
        "clusterId3"
      ]
    }
  • 查看test_stream索引中,cpu.idle指标全部的时间线。
    GET /_time_stream/prom/test_stream/series?match[]=cpu.idle
    预期结果如下。
    {
      "status" : "success",
      "data" : [
        {
          "__name__" : "cpu.idle",
          "disk_type" : "disk_type1",
          "namespace" : "namespaces2",
          "clusterId" : "clusterId1",
          "nodeId" : "nodeId2"
        },
        {
          "__name__" : "cpu.idle",
          "disk_type" : "disk_type1",
          "namespace" : "namespaces2",
          "clusterId" : "clusterId1",
          "nodeId" : "nodeId5"
        },
        {
          "__name__" : "cpu.idle",
          "disk_type" : "disk_type2",
          "namespace" : "namespaces1",
          "clusterId" : "clusterId3",
          "nodeId" : "nodeId5"
        }
      ]
    }

数据查询

您可以通过Prometheus instant query和range query接口,使用PromQL查询Elasticsearch数据。PromQL的支持情况请参见TimeStream PromQL支持情况
  • 通过Prometheus instant query接口查询数据
    GET /_time_stream/prom/test_stream/query?query=cpu.idle&time=1655769837
    说明 time的单位为秒,不传递时,默认查询最新5分钟内的数据。
    返回结果为Prometheus query接口的数据格式,如下所示。
    {
      "status" : "success",
      "data" : {
        "resultType" : "vector",
        "result" : [
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId1",
              "disk_type" : "disk_type1",
              "namespace" : "namespaces2",
              "nodeId" : "nodeId2"
            },
            "value" : [
              1655769837,
              "79.672981161"
            ]
          },
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId1",
              "disk_type" : "disk_type1",
              "namespace" : "namespaces2",
              "nodeId" : "nodeId5"
            },
            "value" : [
              1655769837,
              "79.672981161"
            ]
          },
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId3",
              "disk_type" : "disk_type2",
              "namespace" : "namespaces1",
              "nodeId" : "nodeId5"
            },
            "value" : [
              1655769837,
              "79.672981161"
            ]
          }
        ]
      }
    }
  • 通过Prometheus range query接口查询数据
    GET /_time_stream/prom/test_stream/query_range?query=cpu.idle&start=1655769800&end=16557699860&step=1m
    预期结果如下。
    {
      "status" : "success",
      "data" : {
        "resultType" : "matrix",
        "result" : [
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId1",
              "disk_type" : "disk_type1",
              "namespace" : "namespaces2",
              "nodeId" : "nodeId2"
            },
            "value" : [
              [
                1655769860,
                "79.672981161"
              ]
            ]
          },
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId1",
              "disk_type" : "disk_type1",
              "namespace" : "namespaces2",
              "nodeId" : "nodeId5"
            },
            "value" : [
              [
                1655769860,
                "79.672981161"
              ]
            ]
          },
          {
            "metric" : {
              "__name__" : "cpu.idle",
              "clusterId" : "clusterId3",
              "disk_type" : "disk_type2",
              "namespace" : "namespaces1",
              "nodeId" : "nodeId5"
            },
            "value" : [
              [
                1655769860,
                "79.672981161"
              ]
            ]
          }
        ]
      }
    }

使用DownSample功能

DownSample(降采样)是在时序场景的常用功能,用来加速大范围数据查询的性能,DownSample功能的详细介绍请参见DownSample使用说明。通过TimeStream的创建索引接口创建索引时,可直接指定DownSample规则,示例如下。
PUT _time_stream/test_stream
{
  "time_stream": {
    "downsample": [
      {
        "interval": "1m"
      },
      {
        "interval": "10m"
      },
      {
        "interval": "60m"
      }
    ]
  }
}
生成DownSample索引的流程如下:
  1. DownSample操作是对一个原始索引执行DownSample,生成DownSample索引。DownSample操作是在索引rollover后产生了一个新索引,然后旧索引过了一段时间,不再写入数据时进行的。目前默认是当前时间比旧索引的end_time大两小时才开始进行DownSample。为了模拟这个效果,创建索引时可以手动指定start_timeend_time
    重要 最新索引的end_time会被Elasticsearch修改为最新时间,影响DownSample演示,默认是5分钟修改一次。DownSample演示操作要确保end_time不被修改,end_time值可通过GET {index}/_settings命令查看。
    PUT _time_stream/test_stream
    {
      "template": {
        "settings": {
          "index.time_series.start_time": "2022-06-20T00:00:00.000Z",
          "index.time_series.end_time": "2022-06-21T00:00:00.000Z"
        }
      },
      "time_stream": {
        "downsample": [
          {
            "interval": "1m"
          },
          {
            "interval": "10m"
          },
          {
            "interval": "60m"
          }
        ]
      }
    }
  2. 设置索引的end_time比当前时间至少小两个小时,然后写入一些数据(需要调整@timestampstart_timeend_time之间的时间)。
    POST test_stream/_doc
    {
      "@timestamp": 1655706106000,
      "metrics": {
        "cpu.idle": 79.67298116109929,
        "disk_ioutil": 17.630910821570456,
        "mem.free": 75.79973639970004
      },
      "labels": {
        "disk_type": "disk_type2",
        "namespace": "namespaces1",
        "clusterId": "clusterId3",
        "nodeId": "nodeId5"
      }
    }
  3. 写入一些数据后,需要更新TimeStream索引,去掉start_timeend_time
    POST _time_stream/test_stream/_update
    {
      "time_stream": {
        "downsample": [
          {
            "interval": "1m"
          },
          {
            "interval": "10m"
          },
          {
            "interval": "60m"
          }
        ]
      }
    }
  4. 对索引执行rollover命令。
    POST test_stream/_rollover
  5. rollover完成后,通过GET _cat/indices/test_stream?v&s=i命令,查看test_stream生成的DownSample索引。
    预期结果如下。
    health status index                                          uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    green  open   .ds-test_stream-2022.06.21-000001              vhEwKIlwSGO3ax4RKn****   1   1          9            0     18.5kb         12.1kb
    green  open   .ds-test_stream-2022.06.21-000001_interval_10m r9Tsj0v-SyWJDc64oC****   1   1          1            0     15.8kb          7.9kb
    green  open   .ds-test_stream-2022.06.21-000001_interval_1h  cKsAlMK-T2-luefNAF****   1   1          1            0     15.8kb          7.9kb
    green  open   .ds-test_stream-2022.06.21-000001_interval_1m  L6ocasDFTz-c89KjND****   1   1          1            0     15.8kb          7.9kb
    green  open   .ds-test_stream-2022.06.21-000002              42vlHEFFQrmMAdNdCz****   1   1          0            0       452b           226b
DownSample查询功能的原理:使用DownSample查询功能时,您可以根据查询数据的时间范围,设置合理的interval值。如下图所示,用户传递原始的索引名称(例如test_stream),并在date_histogram里传递interval参数,TimeStream在进行DownSample操作时,能自动选择最合适精度的索引查询数据。
说明 下图以1小时、4小时、1天和7天数据为例,您可以根据需求设置interval值。DownSample会根据您设置的fixed_interval值去自动匹配一个最大精度。例如设置fixed_interval值为120m,您自定义的interval值有1m、10m、60m,那么TimeStream在进行DownSample操作时会自动取60m。
DownSample原理图
例如,执行以下查询:
GET test_stream/_search?size=0&request_cache=false
{
  "aggs": {
    "1": {
      "terms": {
        "field": "labels.disk_type",
        "size": 10
      },
      "aggs": {
        "2": {
          "date_histogram": {
            "field": "@timestamp",
            "fixed_interval": "120m"
          }
        }
      }
    }
  }
}
其中fixed_interval设置为120m,表示可以使用到DownSample最大精度为60分钟的索引,预期返回结果如下。
{
  "took" : 15,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "1" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "disk_type2",
          "doc_count" : 9,
          "2" : {
            "buckets" : [
              {
                "key_as_string" : "2022-06-20T06:00:00.000Z",
                "key" : 1655704800000,
                "doc_count" : 9
              }
            ]
          }
        }
      ]
    }
  }
}
                

根据hits.total.value=1可以看到,只命中了一条记录。aggs结果的doc_count=9,表示实际索引数据量是9,因此可以看到用户查询的不是原始索引,而是DownSample索引。

如果将fixed_interval改为20s,那么可以看到结果的hits.total.value=9,与aggs结果的doc_count结果一致,说明查询到了原始索引。

由此可见,这些DownSample索引跟原始索引的settingsmappings是一致的,只是数据是按时间范围做了降采样。