本文为您介绍如何使用时序数据库InfluxDB连接器。
背景信息
时序数据库InfluxDB®版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。时序数据库InfluxDB®版详情请参见InfluxDB®️介绍。
InfluxDB连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表 |
运行模式 | 流模式 |
数据格式 | Point |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 否 |
前提条件
已创建InfluxDB的数据库,详情请参见管理用户账号和数据库。
使用限制
仅Flink计算引擎VVR 2.1.5及以上版本支持InfluxDB Connector。
语法结构
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
建表默认格式:
第0列:metric(VARCHAR),必填。
第1列:timestamp(BIGINT),必填,单位为毫秒。
第2列:tag_value1(VARCHAR),必填,最少填写一个。
第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。
写入多个field_fieldValue时,您需要按照如下格式填写。
field_fieldValue1 类型, field_fieldValue2 类型, ... field_fieldValueN 类型
示例如下。
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
结果表中只支持metric、timestamp、tag_*和field_*,不能出现其他的字段。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
connector | 结果表类型。 | 是 | 固定值为influxdb。 |
url | InfluxDB的服务地址。 | 是 | 在InfluxDB中,URL为VPC网络地址,例如:https://localhost:8086或http://localhost:3242。 URL支持HTTP和HTTPS。 |
database | InfluxDB的数据库名称。 | 是 | 例如db-flink。 |
username | 数据库的用户名。 | 是 | 需要对目标数据库有写权限。用户名详情请参见管理用户账号和数据库。 |
password | 数据库的密码。 | 是 | 密码详情请参见管理用户账号和数据库。 |
batchSize | 批量提交的记录条数。 | 否 | 默认每次批量提交300条记录。 |
retentionPolicy | 保留策略。 | 否 | 如果您不配置该参数时,该参数会被默认填写为每个数据库的默认保留策略autogen,保留策略详情请参见管理用户账号和数据库。 |
ignoreErrorData | 是否忽略异常数据。 | 否 | 参数取值如下:
|
类型映射
InfluxDB字段类型 | Flink版字段类型 |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
使用示例
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;