本文为您介绍如何使用表格存储Tablestore(OTS)连接器。
背景信息
表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore。
Tablestore连接器支持的信息如下。
类别 | 详情 |
运行模式 | 流模式 |
API种类 | SQL |
支持类型 | 源表、维表和结果表 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见自定义监控指标上报渠道。 |
是否支持更新或删除结果表数据 | 是 |
前提条件
已购买Tablestore实例并创建表,详情请参见使用流程。
使用限制
仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。
语法结构
结果表
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
说明Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
维表
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );
源表
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );
属性列支持读取待消费字段和Tunnel Service,以及返回数据中的
OtsRecordType
和OtsRecordTimestamp
两个字段。字段说明请参见下表。字段名
Flink映射名
描述
OtsRecordType
type
数据操作类型。
OtsRecordTimestamp
timestamp
数据操作时间,单位为微秒。
说明全量读取数据时,OtsRecordTimestamp取值为0。
当需要读取
OtsRecordType
和OtsRecordTimestamp
字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
ots
。instanceName
实例名。
String
是
无
无。
endPoint
实例访问地址。
String
是
无
请参见服务地址。
tableName
表名。
String
是
无
无。
accessId
阿里云账号或者RAM用户的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理。
accessKey
阿里云账号或者RAM用户的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理。
retryIntervalMs
重试间隔时间。
Integer
否
1000
单位为毫秒。
maxRetryTimes
最大重试次数。
Integer
否
100
无。
connectTimeout
连接器连接Tablestore的超时时间。
Integer
否
30000
单位为毫秒。
socketTimeout
连接器连接Tablestore的Socket超时时间。
Integer
否
30000
单位为毫秒。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
tunnelName
表格存储数据表的数据通道名称。
String
是
无
您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道。
ignoreDelete
是否忽略DELETE操作类型的实时数据。
Boolean
否
false
参数取值如下:
true:忽略。
false(默认值):不忽略。
skipInvalidData
是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。
Boolean
否
false
参数取值如下:
true:忽略脏数据。
false(默认值):不忽略脏数据。
说明仅实时计算引擎VVR 8.0.4及以上版本支持该参数。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
valueColumns
插入字段的列名。
String
是
无
多个字段以英文逗号(,)分割,例如ID或NAME。
bufferSize
流入多少条数据后开始输出。
Integer
否
5000
无。
batchWriteTimeoutMs
写入超时的时间。
Integer
否
5000
单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
batchSize
一次批量写入的条数。
Integer
否
100
无。
ignoreDelete
是否忽略DELETE操作。
Boolean
否
False
无。
autoIncrementKey
当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。
String
否
无
当结果表没有主键自增列时,请不要设置此参数。
说明仅实时计算引擎VVR 8.0.4及以上版本支持该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
否
ALL
目前Tablestore维表支持以下三种缓存策略:
None:无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList。
说明因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
缓存大小。
Integer
否
无
当缓存策略选择LRU时,可以设置缓存大小。
cacheTTLMs
缓存失效时间。
Integer
否
无
单位为毫秒。cacheTTLMs配置和cache有关:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
cacheEmpty
是否缓存空结果。
Boolean
否
无
true:缓存
false:不缓存
cacheReloadTimeBlackList
更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。
String
否
无
格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:
用英文逗号(,)来分隔多个黑名单。
用箭头(->)来分割黑名单的起始结束时间。
async
是否异步返回数据。
Boolean
否
false
true:表示异步返回数据。异步返回数据默认是无序的,可通过asyncResultOrder参数进行配置。
false(默认值):表示不进行异步返回数据。
asyncResultOrder
异步返回数据时,结果是否需要保序。
String
否
unordered
unordered(默认值):表示异步返回数据无序。
ordered:表示异步返回数据有序。
说明仅VVR 8.0.0及以上版本支持该参数。
类型映射
Tablestore字段类型 | Flink字段类型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
使用示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
- 本页导读 (1)