本文为您介绍表格存储Tablestore源表的DDL定义、WITH参数和映射关系。
什么是表格存储Tablestore
表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发的无缝扩展,提供海量结构化数据的存储和实时访问服务。
前提条件
已创建Tablestore数据表,详情请参见步骤三:创建数据表。
使用限制
仅Flink计算引擎VVR 6.0.1及以上版本支持表格存储Tablestore Connector。
DDL定义
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='<yourAccessId>',
'accessKey' ='<yourAccessSecret>',
'ignoreDelete' = 'false'
);
属性列支持读取待消费字段和Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp两个字段。字段说明请参见下表。
字段名 | Flink映射名 | 描述 |
---|---|---|
OtsRecordType | type | 数据操作类型。 |
OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。
说明 全量读取数据时,OtsRecordTimestamp取值为0。
|
当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 源表类型。 | 是 | 固定值为ots 。
|
instanceName | 实例名称。 | 是 | 无。 |
endPoint | 实例访问地址。 | 是 | 请参见服务地址。 |
tableName | 数据表名称。 | 是 | 无。 |
tunnelName | 表格存储数据表的数据通道名称。 | 是 | 您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 阿里云账号或者RAM用户的AccessKey ID。 | 是 | 获取AccessKey ID的具体操作,请参见获取AccessKey。 |
accessKey | 阿里云账号或者RAM用户的AccessKey Secret。 | 是 | 获取AccessKey Secret的具体操作,请参见获取AccessKey。 |
ignoreDelete | 是否忽略DELETE操作类型的实时数据。 | 否 | 参数取值如下:
|
类型映射
Tablestore字段类型 | Flink字段类型 |
---|---|
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
代码示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='<yourAccessId>',
'accessKey' ='<yourAccessSecret>',
'ignoreDelete' = 'false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='<yourAccessId>',
'accessKey'='<yourAccessSecret>',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;