本文介绍如何在实时计算Flink上创建时序结果表并写入至云原生多模数据库Lindorm时序引擎。
前提条件
- 已购买实时计算Flink版或者已有自建Flink,实时计算Flink版的购买请参见快速入门。
说明 实时计算Flink需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。
- 云原生多模数据库Lindorm实例已开通时序引擎功能。
- Lindorm时序引擎版本为3.4.7及以上版本,查看方法请参见时序引擎版本说明。
- 为了保证网络的连通性,确保云原生多模数据库Lindorm实例和实时计算Flink使用相同的专有网络。
说明 如果您需要通过公网使用实时计算Flink写入时序引擎,请确保实时计算Flink具备公网访问能力并配置Lindorm白名单,请参见Flink全托管集群如何访问公网。
背景信息
时序引擎SINK是一种将时序结果表写入SINK连接器的插件,支持将多种数据源的数据高效写入至Lindorm时序引擎并进行业务分析。使用时序引擎SINK插件,需要先获取时序引擎SINK插件,再将JAR包上传至实时计算Flink版控制台,上传方法请参见Connector使用。
语法
在实时计算Flink上创建时序结果表,创建语法代码如下:
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_tagname VARCHAR,
field_fieldname1 DOUBLE,
field_fieldname2 VARCHAR,
field_fieldname3 BIGINT,
field_fieldname4 BOOLEAN
-- table VARCHAR(可选字段)
)
with (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'='mytable',
'schemaPolicy'='weak',
'sync'='false',
'debug'='false'
);
参数说明
默认格式
字段名 | 数据类型 | 是否必选 | 说明 |
---|---|---|---|
timestamp | BIGINT | 是 | 单位为毫秒。
说明 字段值为13位时间戳,如果是10位时间戳,写入会自动转换为13位。
|
tag_tagname | VARCHAR | 是 | 指定时序数据标签(Tag)。tag_表示前缀(不能省略),tagname表示时序数据标签名称。
说明 tag_tagname为一列或者多列。
|
field_fieldname | DOUBLE、VARCHAR、BIGINT、BOOLEAN | 是 | 指定时序数据量测值(Field)。field_表示前缀(不能省略),fieldname表示时序数据量测值名称。
说明 field_fieldname为一列或者多列。
|
table | VARCHAR | 否 | 指定时序数据表。
|
WITH参数
参数 | 是否必选 | 说明 |
---|---|---|
connector | 是 | 固定值lindormtsdb,指定时序引擎SINK插件。 |
url | 是 | Lindorm时序引擎的连接地址,获取方法请参见查看连接地址。 |
table | 否 | 指定时序数据表。
|
username | 否 | 默认情况未启用时序引擎的用户认证与权限校验,访问时序引擎时无需输入用户名和密码。但是为了数据安全,建议开启时序引擎的用户认证与权限校验,开启后访问时序引擎都需要输入用户名和密码。通过SQL创建用户和修改用户密码请参见CREATE USER和ALTER USER。 |
password | 否 | |
defaultDatabase | 否 | 读写数据库,默认值为default。 |
schemaPolicy | 否 | 表的Schema约束策略,可以设置为:
说明 设置为Strong或者None时需要手动建表。更多信息请参见关于时序数据的Schema约束。
|
ignoreErrorData | 否 | 是否忽略写入错误。
|
maxRetries | 否 | 写入时遇到服务端内部错误或者网络错误时最大重试次数,默认值为3。 |
batchSize | 否 | 写入时每次向服务端发送的数据量,默认值为500个数据点。 |
connectTimeoutMs | 否 | HTTP连接超时时间,默认值为90000。 |
debug | 否 | 是否开启debug模式,用来打印详细数据点日志。
|
sync | 否 | 是否同步写入,建议使用false。
|
示例
以datagen_source随机数据生成器为例,将生成的数据写入Lindorm时序表tsdb_sink中。示例代码如下:
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;