TSDB Reader插件实现了从TSDB读取数据的功能,本文为您介绍TSDB Reader支持的数据类型、字段映射和数据源等参数及配置示例。

注意 TSDB Reader仅支持使用独享数据集成资源组,不支持使用公共资源组和自定义资源组

背景信息

时序数据库TSDB是一种高性能、低成本、稳定可靠的在线时序时空数据库服务,为您提供高效读写、高压缩比存储、时序数据插值及聚合计算等服务。TSDB广泛应用于物联网和互联网领域,可以实时监控设备和业务服务,并实时预测告警,详情请参见时序数据库TSDB

在底层实现上,TSDB Reader插件通过HTTP请求连接TSDB实例,使用/api/query/api/mquery接口扫描数据点,详情请参见HTTP API概览。TSDB Reader插件通过时间线和查询时间范围来切分整个同步任务。

使用限制

  • 某一个Metric下,一个小时内的数据量过大。您需要通过设置-j参数,调整JVM的内存大小。
    如果下游Writer的写入速度不及TSDB Reader的查询速度,会存在任务积压的情况,您需要适当调整JVM的参数。例如,从阿里云TSDB数据库同步抽取数据到本地,启动命令如下。
    python datax/bin/datax.py tsdb2stream.json -j "-Xms4096m -Xmx4096m"
  • 指定的起止时间会自动被转为整点时刻。

    例如,2019年4月18日的[3:35, 4:55)会被转为[3:00, 4:00)

类型转换列表

TSDB Reader针对TSDB类型的转换列表,如下所示。
数据集成内部类型 TSDB数据类型
STRING TSDB数据点的序列化字符串,包括Timestamp、Metric、Tags、Fields和Value。

参数说明

参数 描述 必选 默认值
name TSDB Reader插件的名称。 tsdbreader
sinkDbType 目标数据库的类型。
sinkDbType包括TSDBRDB两个取值:
  • TSDB包括阿里云TSDB、OpenTSDB、InfluxDB、Prometheus和TimeScale。
  • RDB包括AnalyticDB for MySQL、Oracle、MySQL、PostgreSQL和DRDS等。
TSDB
endpoint TSDB的HTTP连接地址,格式为http://IP:Port
column 不同场景下的字段说明如下:
  • TSDB场景下:数据迁移任务需要迁移的Metric列表。
  • RDB场景下:映射到关系型数据库中的表字段,且增加__metric____ts____value__三个字段。

    其中__metric__用于映射度量字段,__ts__用于映射timestamp字段,__value__仅适用于单值场景,用于映射度量值。多值场景下,直接指定field字段。

metric 仅适用于RDB场景,表示数据迁移任务需要迁移的Metric列表。
field 仅适用于多值场景,表示数据迁移任务需要迁移的Field列表。
tag 数据迁移任务需要迁移的TagKeyTagValue,用于进一步过滤时间线。
splitIntervalMs 用于数据集成内部切分Task,每个Task仅查询一小部分的时间段。单位为毫秒(ms)。
beginDateTime 配合endDateTime使用,用于指定需要被迁移的数据点所在的时间段,格式为yyyy-MM-dd HH:mm:ss
说明 指定的起止时间会自动忽略分钟和秒,转为整点时刻。例如2019年4月18日的[3:35, 4:55)会被转为[3:00, 4:00)
endDateTime 配合beginDateTime使用,用于指定需要被迁移的数据点所在的时间段,格式为yyyy-MM-dd HH:mm:ss
说明 指定的起止时间会自动忽略分钟和秒,转为整点时刻。例如2019年4月18日的[3:35, 4:55)会被转为[3:00, 4:00)
combine 用于合并多行数据为一行进行输出。

combine主要适用于数据源为稀疏表的场景,开启该参数后,可以合并多行数据为一行。但仅拥有相同timestamp的多行数据,即同一个时刻的多条数据,才会被合并为一行数据。

combine模式下,__metric__.xxx字段会输出对应的Value值,您无需定义__value__字段。同时,您需要确保__metric__.xxx字段的定义在column数组的最前面,以保证获取metric

false

向导开发介绍

暂不支持向导开发模式。

脚本开发介绍

脚本开发的详情请参见通过脚本模式配置任务,配置示例如下:
  • 配置一个从阿里云TSDB数据库同步抽取数据至本地的作业,并以时序数据的格式输出。
    时序数据示例如下。
    {"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1}
    作业配置如下。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "TSDB", //目标数据库的类型。 
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "m"
                    ],
                    "combine":false,
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{
    
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取数据至本地的作业,并以关系型数据的格式输出。
    关系型数据示例如下。
    m 1546272125 a1    c1 g2 i3021 z4 1.0
    作业配置如下。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "RDB", //目标数据库的类型。 
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "__metric__",
                      "__ts__",
                      "app",
                      "cluster",
                      "group",
                      "ip",
                      "zone",
                      "__value__"
                    ],
                    "metric": [
                      "m"
                    ],
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{
    
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取单值数据至AnalyticDB for MySQL的作业。
    
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "RDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "__metric__",
                      "__ts__",
                      "app",
                      "cluster",
                      "group",
                      "ip",
                      "zone",
                      "__value__"
                    ],
                    "metric": [
                      "m"
                    ],
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{
    
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取多值数据至AnalyticDB for MySQL的作业。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "RDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "__metric__",
                      "__ts__",
                      "app",
                      "cluster",
                      "group",
                      "ip",
                      "zone",
                      "load",
                      "memory",
                      "cpu"
                    ],
                    "metric": [
                      "m_field"
                    ],
                    "field": {
                      "m_field": [
                        "load",
                        "memory",
                        "cpu"
                      ]
                    },
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"ads",
                  "parameter": {
                    "username": "******",
                    "password": "******",
                    "column": [
                      "`metric`",
                      "`ts`",
                      "`app`",
                      "`cluster`",
                      "`group`",
                      "`ip`",
                      "`zone`",
                      "`load`",
                      "`memory`",
                      "`cpu`"
                    ],
                    "url": "http://localhost:3306",
                    "schema": "datax_test",
                    "table": "datax_test_multi_field",
                    "writeMode": "insert",
                    "opIndex": "0",
                    "batchSize": "2"
                  },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取单值数据至AnalyticDB for MySQL,并指定过滤部分时间线的作业。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "RDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "__metric__",
                      "__ts__",
                      "app",
                      "cluster",
                      "group",
                      "ip",
                      "zone",
                      "__value__"
                    ],
                    "metric": [
                      "m"
                    ],
                    "tag": {
                      "m": {
                        "app": "a1",
                        "cluster": "c1"
                      }
                    },
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"ads",
                  "parameter": {
                    "username": "******",
                    "password": "******",
                    "column": [
                      "`metric`",
                      "`ts`",
                      "`app`",
                      "`cluster`",
                      "`group`",
                      "`ip`",
                      "`zone`",
                      "`value`"
                    ],
                    "url": "http://localhost:3306",
                    "schema": "datax_test",
                    "table": "datax_test",
                    "writeMode": "insert",
                    "opIndex": "0",
                    "batchSize": "2"
                  },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取多值数据至AnalyticDB for MySQL,并指定过滤部分时间线的作业。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "RDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "__metric__",
                      "__ts__",
                      "app",
                      "cluster",
                      "group",
                      "ip",
                      "zone",
                      "load",
                      "memory",
                      "cpu"
                    ],
                    "metric": [
                      "m_field"
                    ],
                    "field": {
                      "m_field": [
                        "load",
                        "memory",
                        "cpu"
                      ]
                    },
                    "tag": {
                      "m_field": {
                        "ip": "i999"
                      }
                    },
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"ads",
                  "parameter": {
                    "username": "******",
                    "password": "******",
                    "column": [
                      "`metric`",
                      "`ts`",
                      "`app`",
                      "`cluster`",
                      "`group`",
                      "`ip`",
                      "`zone`",
                      "`load`",
                      "`memory`",
                      "`cpu`"
                    ],
                    "url": "http://localhost:3306",
                    "schema": "datax_test",
                    "table": "datax_test_multi_field",
                    "writeMode": "insert",
                    "opIndex": "0",
                    "batchSize": "2"
                  },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取单值数据至另一个阿里云TSDB数据库的作业。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "TSDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "m"
                    ],
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"tsdb",
                  "parameter": {
                    "endpoint": "http://localhost:8240"
                  },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置一个从阿里云TSDB数据库同步抽取多值数据至另一个阿里云TSDB数据库的作业。
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"tsdb",
                  "parameter": {
                    "sinkDbType": "TSDB",
                    "endpoint": "http://localhost:8242",
                    "column": [
                      "m_field"
                    ],
                    "field": {
                      "m_field": [
                        "load",
                        "memory",
                        "cpu"
                      ]
                    },
                    "splitIntervalMs": 60000,
                    "beginDateTime": "2019-01-01 00:00:00",
                    "endDateTime": "2019-01-01 01:00:00"
                  },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"tsdb",
                  "parameter": {
                    "multiField": true,
                    "endpoint": "http://localhost:8240"
                  },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1 //作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }