本文介绍如何在实时计算Flink上创建时序结果表并写入至云原生多模数据库Lindorm时序引擎。

前提条件

  • 已购买实时计算Flink版或者已有自建Flink,实时计算Flink版的购买请参见快速入门
    说明 实时计算Flink需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。
  • 云原生多模数据库Lindorm实例已开通时序引擎功能。
  • Lindorm时序引擎版本为3.4.7及以上版本,查看方法请参见时序引擎版本说明
  • 为了保证网络的连通性,确保云原生多模数据库Lindorm实例和实时计算Flink使用相同的专有网络。
    说明 如果您需要通过公网使用实时计算Flink写入时序引擎,请确保实时计算Flink具备公网访问能力并配置Lindorm白名单,请参见Flink全托管集群如何访问公网

背景信息

时序引擎SINK是一种将时序结果表写入SINK连接器的插件,支持将多种数据源的数据高效写入至Lindorm时序引擎并进行业务分析。使用时序引擎SINK插件,需要先获取时序引擎SINK插件,再将JAR包上传至实时计算Flink版控制台,上传方法请参见Connector使用

语法

在实时计算Flink上创建时序结果表,创建语法代码如下:
CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_tagname VARCHAR,
  field_fieldname1 DOUBLE,
  field_fieldname2 VARCHAR,
  field_fieldname3 BIGINT,
  field_fieldname4 BOOLEAN
  -- table VARCHAR(可选字段)
)
with (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'='mytable',
    'schemaPolicy'='weak',
    'sync'='false',
    'debug'='false'
);

参数说明

默认格式

字段名 数据类型 是否必选 说明
timestamp BIGINT 单位为毫秒。
说明 字段值为13位时间戳,如果是10位时间戳,写入会自动转换为13位。
tag_tagname VARCHAR 指定时序数据标签(Tag)。tag_表示前缀(不能省略),tagname表示时序数据标签名称。
说明 tag_tagname为一列或者多列。
field_fieldname DOUBLE、VARCHAR、BIGINT、BOOLEAN 指定时序数据量测值(Field)。field_表示前缀(不能省略),fieldname表示时序数据量测值名称。
说明 field_fieldname为一列或者多列。
table VARCHAR 指定时序数据表。
  • 如果写入一张时序数据表,建议在WITH参数中配置。
  • 如果写入多张表可以在该字段中自定义表名。

WITH参数

参数 是否必选 说明
connector 固定值lindormtsdb,指定时序引擎SINK插件。
url Lindorm时序引擎的连接地址,获取方法请参见查看连接地址
table 指定时序数据表。
  • 如果写入一张时序数据表,建议配置此参数。
  • 如果写入多张表可以在table字段中自定义表名。
username 默认情况未启用时序引擎的用户认证与权限校验,访问时序引擎时无需输入用户名和密码。但是为了数据安全,建议开启时序引擎的用户认证与权限校验,开启后访问时序引擎都需要输入用户名和密码。通过SQL创建用户和修改用户密码请参见CREATE USERALTER USER
password
defaultDatabase 读写数据库,默认值为default。
schemaPolicy 表的Schema约束策略,可以设置为:
  • Strong:默认值,强约束,时序引擎会严格依据预先定义的表结构对写入数据的表名、字段名、类型进行校验。
  • Weak:弱约束,写入数据的表不存在时引擎不会报错,而是会自动创建对应的表。
  • None:无约束,无法直接通过SQL查询写入的数据。
说明 设置为Strong或者None时需要手动建表。更多信息请参见关于时序数据的Schema约束
ignoreErrorData 是否忽略写入错误。
  • false:默认值,不忽略,如果遇到错误就跳出写入操作。
  • true:忽略,如果遇到错误就忽略继续写入操作。
maxRetries 写入时遇到服务端内部错误或者网络错误时最大重试次数,默认值为3。
batchSize 写入时每次向服务端发送的数据量,默认值为500个数据点。
connectTimeoutMs HTTP连接超时时间,默认值为90000。
debug 是否开启debug模式,用来打印详细数据点日志。
  • false:默认值,不开启。
  • true:开启。
sync 是否同步写入,建议使用false。
  • false:异步写入,默认值,写入效率高。
  • true:同步写入。

示例

以datagen_source随机数据生成器为例,将生成的数据写入Lindorm时序表tsdb_sink中。示例代码如下:
CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;