本文为您介绍如何使用表格存储Tablestore(OTS)连接器。
背景信息
表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore。
Tablestore连接器支持的信息如下。
类别 | 详情 |
运行模式 | 流模式 |
API种类 | SQL |
支持类型 | 源表、维表和结果表 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
是否支持更新或删除结果表数据 | 是 |
前提条件
已购买Tablestore实例并创建表,详情请参见使用流程。
语法结构
结果表
CREATE TABLE ots_sink (
name VARCHAR,
age BIGINT,
birthday BIGINT,
primary key(name,age) not enforced
) WITH (
'connector'='ots',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'endPoint'='<yourEndpoint>',
'valueColumns'='birthday'
);
Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
维表
CREATE TABLE ots_dim (
id int,
len int,
content STRING
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}'
);
源表
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false'
);
属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordType
和OtsRecordTimestamp
两个字段。字段说明请参见下表。
字段名 | Flink映射名 | 描述 |
OtsRecordType | type | 数据操作类型。 |
OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。 说明 全量读取数据时,OtsRecordTimestamp取值为0。 |
当需要读取OtsRecordType
和OtsRecordTimestamp
字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为 |
instanceName | 实例名。 | String | 是 | 无 | 无。 |
endPoint | 实例访问地址。 | String | 是 | 无 | 请参见服务地址。 |
tableName | 表名。 | String | 是 | 无 | 无。 |
accessId | 阿里云账号或者RAM用户的AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息?。 重要 为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 |
accessKey | 阿里云账号或者RAM用户的AccessKey Secret。 | String | 是 | 无 | |
connectTimeout | 连接器连接Tablestore的超时时间。 | Integer | 否 | 30000 | 单位为毫秒。 |
socketTimeout | 连接器连接Tablestore的Socket超时时间。 | Integer | 否 | 30000 | 单位为毫秒。 |
ioThreadCount | IO线程数量。 | Integer | 否 | 4 | 无。 |
callbackThreadPoolSize | 回调线程池大小。 | Integer | 否 | 4 | 无。 |
源表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
tunnelName | 表格存储数据表的数据通道名称。 | String | 是 | 无 | 您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见操作步骤。 |
ignoreDelete | 是否忽略DELETE操作类型的实时数据。 | Boolean | 否 | false | 参数取值如下:
|
skipInvalidData | 是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。 | Boolean | 否 | false | 参数取值如下:
说明 仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 |
retryStrategy | 重试策略。 | Enum | 否 | TIME | 参数取值如下:
|
retryCount | 重试次数。 | Integer | 否 | 3 | 当retryStrategy设置为COUNT时,可以设置重试次数。 |
retryTimeoutMs | 重试的超时时间。 | Integer | 否 | 180000 | 当retryStrategy设置为TIME时,可以设置重试的超时时间,单位为毫秒。 |
streamOriginColumnMapping | 原始列名到真实列名的映射。 | String | 否 | 无 | 原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如 |
outputSpecificRowType | 是否透传具体的RowType。 | Boolean | 否 | false | 参数取值如下:
|
dataFetchTimeoutMs | 读取表的分区的最大时间开销。 | Integer | 否 | 10000 | 单位为毫秒。当表的分区数量较多且同步任务对延迟较为敏感时,可以适当调小此参数,以降低同步延迟。 说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 |
enableRequestCompression | 是否开启数据压缩。 | Boolean | 否 | false | 在读取数据时是否启用数据压缩功能。启用后将节省带宽,但可能会增加CPU负载。 说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 |
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
retryIntervalMs | 重试间隔时间。 | Integer | 否 | 1000 | 单位为毫秒。 |
maxRetryTimes | 最大重试次数。 | Integer | 否 | 10 | 无。 |
valueColumns | 插入字段的列名。 | String | 是 | 无 | 多个字段以半角逗号(,)分割,例如ID或NAME。 |
bufferSize | 流入多少条数据后开始输出。 | Integer | 否 | 5000 | 无。 |
batchWriteTimeoutMs | 写入超时的时间。 | Integer | 否 | 5000 | 单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
batchSize | 一次批量写入的条数。 | Integer | 否 | 100 | 最大值为200。 |
ignoreDelete | 是否忽略DELETE操作。 | Boolean | 否 | False | 无。 |
autoIncrementKey | 当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。 | String | 否 | 无 | 当结果表没有主键自增列时,请不要设置此参数。 说明 仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 |
overwriteMode | 数据覆盖模式。 | Enum | 否 | PUT | 参数取值如下:
说明 动态列模式下只支持UPDATE模式。 |
defaultTimestampInMillisecond | 设定写入Tablestrore数据的默认时间戳。 | Long | 否 | -1 | 如果不指定,则会使用系统当前的毫秒时间戳。 |
dynamicColumnSink | 是否开启动态列模式。 | Boolean | 否 | false | 动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。 说明 开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。 |
checkSinkTableMeta | 是否检查结果表元数据。 | Boolean | 否 | true | 若设置为true,会检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。 |
enableRequestCompression | 数据写入过程中是否开启数据压缩。 | Boolean | 否 | false | 无。 |
maxColumnsCount | 写入下游表的最大列数。 | Integer | 否 | 128 | 如果写入的列数超过128,则可能会出现错误: 说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 |
storageType | 写入结果表的类型。 | String | 否 | WIDE_COLUMN | 参数取值如下:
说明 仅实时计算引擎VVR 8.0.10及以上版本支持。 |
维表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
retryIntervalMs | 重试间隔时间。 | Integer | 否 | 1000 | 单位为毫秒。 |
maxRetryTimes | 最大重试次数。 | Integer | 否 | 10 | 无。 |
cache | 缓存策略。 | String | 否 | ALL | 目前Tablestore维表支持以下三种缓存策略:
|
cacheSize | 缓存大小。 | Integer | 否 | 无 | 当缓存策略选择LRU时,可以设置缓存大小。 说明 单位为数据条数。 |
cacheTTLMs | 缓存失效时间。 | Integer | 否 | 无 | 单位为毫秒。cacheTTLMs配置和cache有关:
|
cacheEmpty | 是否缓存空结果。 | Boolean | 否 | 无 |
|
cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | String | 否 | 无 | 格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:
|
async | 是否异步返回数据。 | Boolean | 否 | false |
|
类型映射
源表
Tablestore字段类型 | Flink字段类型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
结果表
Flink字段类型 | Tablestore字段类型 |
BINARY | BINARY |
VARBINARY | |
CHAR | STRING |
VARCHAR | |
TINYINT | INTEGER |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | DOUBLE |
DOUBLE | |
BOOLEAN | BOOLEAN |
使用示例
示例1
从OTS源表向OTS结果表进行数据写入。
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
示例2
宽表模型同步到时序模型。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_ots_timeseries_2',
'tunnelName' = 'timeseries_source_tunnel_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'ignoreDelete' = 'true', //是否忽略delete操作的数据
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_timeseries_sink_table_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'storageType' = 'TIMESERIES',
);
--将源表数据插入到结果表。
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;