本文为您介绍表格存储Tablestore源表的DDL定义、WITH参数和映射关系。

什么是表格存储Tablestore

表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发的无缝扩展,提供海量结构化数据的存储和实时访问服务。

前提条件

已创建Tablestore数据表,详情请参见步骤三:创建数据表

使用限制

仅Flink计算引擎VVR 6.0.1及以上版本支持表格存储Tablestore Connector。

DDL定义

CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='<yourAccessId>',
  'accessKey' ='<yourAccessSecret>',
  'ignoreDelete' = 'false' 
);

属性列支持读取待消费字段和Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp两个字段。字段说明请参见下表。

字段名 Flink映射名 描述
OtsRecordType type 数据操作类型。
OtsRecordTimestamp timestamp 数据操作时间,单位为微秒。
说明 全量读取数据时,OtsRecordTimestamp取值为0。
当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。
CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  record_type STRING METADATA FROM 'type',
  record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
  ...
);

WITH参数

参数 说明 是否必填 备注
connector 源表类型。 固定值为ots
instanceName 实例名称。 无。
endPoint 实例访问地址。 请参见服务地址
tableName 数据表名称。 无。
tunnelName 表格存储数据表的数据通道名称。 您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道
accessId 阿里云账号或者RAM用户的AccessKey ID。 获取AccessKey ID的具体操作,请参见获取AccessKey
accessKey 阿里云账号或者RAM用户的AccessKey Secret。 获取AccessKey Secret的具体操作,请参见获取AccessKey
ignoreDelete 是否忽略DELETE操作类型的实时数据。 参数取值如下:
  • true:忽略。
  • false(默认值):不忽略。

类型映射

Tablestore字段类型 Flink字段类型
INTEGER BIGINT
STRING STRING
BOOLEAN BOOLEAN
DOUBLE DOUBLE

代码示例

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='<yourAccessId>',
  'accessKey' ='<yourAccessSecret>',
  'ignoreDelete' = 'false'
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessSecret>',
  'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;