Flink可以处理实时数据流,并将处理结果写入Lindorm时序引擎,以实现实时数据监控等场景。本文介绍如何将Flink上实时的数据处理结果写入到时序引擎。
前提条件
已开通实时计算Flink版或者已有自建Flink。实时计算Flink版的开通,请参见开通实时计算Flink版。
说明实时计算Flink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。
为了保证网络的连通性,确保云原生多模数据库 Lindorm实例和实时计算Flink使用相同的专有网络。
说明实时计算Flink版默认不具备访问公网的能力,如需通过公网将数据写入时序引擎,请参见Flink全托管集群如何访问公网。
已开通时序引擎。
已将Flink的IP地址添加到Lindorm白名单。如果您使用的是实时计算Flink版,查看IP地址的操作,请参见如何设置白名单。添加Lindorm白名单的操作,请参见设置白名单。
背景信息
时序引擎Sink连接器用于连接其他系统与Lindorm时序引擎,负责从各种数据源接收数据并写入到时序引擎。实时计算Flink版通过Flink SQL定义源表、维表和结果表,通过定义时序引擎Sink连接器的参数,将结果表映射到Lindorm时序表,从而将Flink处理后的结果数据写入Lindorm时序引擎。使用时序引擎SINK插件,需要先获取时序引擎SINK插件,再将JAR包上传至实时计算Flink版控制台,上传方法请参见JAR作业开发。
语法
在实时计算Flink上创建结果表,并配置时序引擎Sink连接器参数,实现Flink结果表到Lindorm时序表的映射。
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR(可选字段)
)
WITH (
'connector' = 'lindormtsdb',
'url'='<lindormTSDBHttpUrl>',
'table'='<yourTableName>',
'defaultDatabase'='<yourDatabaseName>',
'schemaPolicy'='<schemaPolicy>',
'sink.parallelism'='<sinkParallelism>'
'ignoreErrorData'='<ignoreErrorData>',
'maxRetries'='<maxRetries>',
'batchSize'='<batchSize>',
'connectTimeoutMs'='<connectTimeoutMs>',
'sync'='<sync>',
'debug'='<debug>'
);
参数说明
结果表结构参数说明
字段名 | 数据类型 | 是否必选 | 说明 |
timestamp | BIGINT | 是 | 字段名必须为 单位为毫秒(ms)。 说明
|
tag_tagname | VARCHAR | 是 | 指定时序数据的标签(Tag)。 示例:tag_deviceid。 说明 tag_tagname可以为一列或者多列。 |
field_fieldname | DOUBLE、VARCHAR、BIGINT、BOOLEAN | 是 | 指定时序数据量测值(Field)。 示例:field_humidity。 说明 field_fieldname可以为一列或者多列。 |
table | VARCHAR | 否 | 指定时序数据表。
|
WITH参数说明
参数 | 是否必选 | 说明 |
connector | 是 | 固定值lindormtsdb,指定时序引擎SINK插件。 |
url | 是 | Lindorm时序引擎的HTTP连接地址,获取方法请参见查看连接地址。 |
table | 否 | 指定时序数据表。
|
username | 条件必选 | 连接时序引擎的用户名和密码。 如已开启用户认证与权限校验,则必须输入用户名和密码。否则无需输入。 说明 时序引擎默认未开启用户认证与权限校验。为了数据安全,建议您开启时序引擎的用户认证与权限校验。 |
password | 条件必选 | |
defaultDatabase | 否 | 写入数据的数据库。默认值为default。 |
schemaPolicy | 否 | Schema约束策略。
说明 更多信息请参见Schema约束。 |
sink.parallelism | 否 | 写入并发度,当写入数据量较大时可适当增加并发度,默认值为1 。 |
ignoreErrorData | 否 | 是否忽略写入错误。
|
maxRetries | 否 | 写入时遇到服务端内部错误或者网络错误时最大重试次数,默认值为3。 |
batchSize | 否 | 批处理大小,即每次写入数据库的数据量,默认值为500个数据点。 |
connectTimeoutMs | 否 | HTTP连接超时时间,默认值为90000。单位为毫秒(ms)。 |
debug | 否 | 是否开启debug模式,用来打印详细数据点日志。
|
sync | 否 | 是否同步写入,建议使用false。
|
使用示例
以datagen_source随机数据生成器为例,将生成的数据写入Lindorm时序表mytable中。示例代码如下:
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable',
'schemaPolicy'='weak'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;