时序数据库InfluxDB

本文为您介绍如何使用时序数据库InfluxDB连接器。

背景信息

时序数据库InfluxDB®版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。时序数据库InfluxDB®版详情请参见InfluxDB®️介绍

InfluxDB连接器支持的信息如下。

类别

详情

支持类型

结果表

运行模式

流模式

数据格式

Point

特有监控指标

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

前提条件

已创建InfluxDB的数据库,详情请参见管理用户账号和数据库

使用限制

仅Flink计算引擎VVR 2.1.5及以上版本支持InfluxDB Connector。

语法结构

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

建表默认格式:

  • 第0列:metric(VARCHAR),必填。

  • 第1列:timestamp(BIGINT),必填,单位为毫秒。

  • 第2列:tag_value1(VARCHAR),必填,最少填写一个。

  • 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。

    写入多个field_fieldValue时,您需要按照如下格式填写。

    field_fieldValue1 类型,
    field_fieldValue2 类型,
    ...  
    field_fieldValueN 类型

    示例如下。

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueNINTEGER
说明

结果表中只支持metrictimestamptag_*field_*,不能出现其他的字段。

WITH参数

参数

说明

是否必填

备注

connector

结果表类型。

固定值为influxdb。

url

InfluxDB的服务地址。

在InfluxDB中,URL为VPC网络地址,例如:https://localhost:8086http://localhost:3242。

URL支持HTTP和HTTPS。

database

InfluxDB的数据库名称。

例如db-flink。

username

数据库的用户名。

需要对目标数据库有写权限。用户名详情请参见管理用户账号和数据库

password

数据库的密码。

密码详情请参见管理用户账号和数据库

batchSize

批量提交的记录条数。

默认每次批量提交300条记录。

retentionPolicy

保留策略。

如果您不配置该参数时,该参数会被默认填写为每个数据库的默认保留策略autogen,保留策略详情请参见管理用户账号和数据库

ignoreErrorData

是否忽略异常数据。

参数取值如下:

  • true:忽略异常数据。

  • false(默认值):不忽略异常数据。

类型映射

InfluxDB字段类型

Flink版字段类型

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

使用示例

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;