本文介绍如何使用实时计算Flink访问表格存储中的源表和结果表。
背景信息
实时计算Flink能将Tunnel Service的数据通道作为流式数据的输入,每条数据类似一个JSON格式。示例如下:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
字段名 | 描述 |
---|---|
OtsRecordType | 数据操作类型,取值范围为PUT、UPDATE、DELETE。 |
OtsRecordTimestamp | 数据操作时间,单位为微秒。全量数据时取值为0。 |
PrimaryKey | 主键列信息,以JSON格式数组表示。支持配置1~4列,请以实际主键列为准。包括如下选项:
|
Columns | 属性列信息,以JSON格式的数组表示。包括如下选项:
|
Tablestore数据源表
存储在Tablestore中数据的主键和属性列值均可以在Flink中通过数据源表DDL以列名与相应的类型映射进行读取。更多信息,请参见表格存储Tablestore源表。
DDL定义
数据源表的DDL定义示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据.
);
除了待消费的用户数据外,Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp字段均支持通过属性字段的方式读取。字段说明请参见下表。
字段名 | Flink映射名 | 描述 |
---|---|---|
OtsRecordType | type | 数据操作类型。 |
OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。全量数据时取值为0。 |
当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH参数
参数 | 是否必填 | 描述 |
---|---|---|
connector | 是 | 原表类型。固定取值为ots。 |
endPoint | 是 | 表格存储实例的服务地址。更多信息,请参见服务地址。 |
instanceName | 是 | 表格存储的实例名称。 |
tableName | 是 | 表格存储的数据表名称。 |
tunnelName | 是 | 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey。 |
accessKey | 是 | |
ignoreDelete | 否 | 是否忽略DELETE操作类型的实时数据,可选配置。默认值为false。 |
类型映射
Tablestore字段类型 | Flink字段类型 |
---|---|
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
Tablestore数据结果表
Flink支持使用Tablestore存储输出结果。更多信息,请参见表格存储Tablestore结果表。
DDL定义
结果表的DDL定义示例如下:
说明 在Tablestore数据结果表定义中除了主键列外,需要包含至少一个属性列。
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
WITH参数
参数 | 是否必填 | 描述 |
---|---|---|
connector | 是 | 结果表类型。固定取值为ots。 |
endPoint | 是 | 表格存储实例的服务地址。更多信息,请参见服务地址。 |
instanceName | 是 | 表格存储的实例名称。 |
tableName | 是 | 表格存储的数据表名称。 |
tunnelName | 是 | 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey。 |
accessKey | 是 | |
valueColumns | 是 | 指定插入的字段列名。插入多个字段以半角逗号(,)分隔。例如ID,NAME 。
|
bufferSize | 否 | 流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。 |
batchWriteTimeoutMs | 否 | 写入超时的时间。单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
batchSize | 否 | 一次批量写入的条数。默认值为100。 |
retryIntervalMs | 否 | 重试间隔时间,单位毫秒。默认值为1000。 |
maxRetryTimes | 否 | 最大重试次数。默认值为100。 |
ignoreDelete | 否 | 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。 |
SQL示例
读取数据源表的数据
批量从数据源表ots source中读取数据,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
SELECT * FROM tablestore_stream LIMIT 100;
数据同步到结果表
ots sink数据会以updateRow的方式写入结果表,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
实时计算作业开发流程
- 创建SQL作业。
- 编辑作业并上线。
- 启动作业。