本文为您介绍如何使用表格存储Tablestore(OTS)连接器。
背景信息
表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore。
Tablestore连接器支持的信息如下。
| 类别 | 详情 | 
| 运行模式 | 流模式 | 
| API种类 | SQL | 
| 支持类型 | 源表、维表和结果表 | 
| 数据格式 | 暂不支持 | 
| 特有监控指标 | 
 说明  指标含义详情,请参见监控指标说明。 | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
已购买Tablestore实例并创建表,详情请参见使用流程。
语法结构
结果表
CREATE TABLE ots_sink (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  primary key(name,age) not enforced
) WITH (
  'connector'='ots',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'endPoint'='<yourEndpoint>',
  'valueColumns'='birthday'
);Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
维表
CREATE TABLE ots_dim (
  id int,
  len int,
  content STRING
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}'
);源表
CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false'
);属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordType和OtsRecordTimestamp两个字段。字段说明请参见下表。
| 字段名 | Flink映射名 | 描述 | 
| OtsRecordType | type | 数据操作类型。 | 
| OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。 说明  全量读取数据时,OtsRecordTimestamp取值为0。 | 
当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。
CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  record_type STRING METADATA FROM 'type',
  record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
  ...
);WITH参数
通用
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 表类型。 | String | 是 | 无 | 固定值为 | 
| instanceName | 实例名。 | String | 是 | 无 | 无。 | 
| endPoint | 实例访问地址。 | String | 是 | 无 | 请参见服务地址。 | 
| tableName | 表名。 | String | 是 | 无 | 无。 | 
| accessId | 阿里云账号或者RAM用户的AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息?。 重要  为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 | 
| accessKey | 阿里云账号或者RAM用户的AccessKey Secret。 | String | 是 | 无 | |
| connectTimeout | 连接器连接Tablestore的超时时间。 | Integer | 否 | 30000 | 单位为毫秒。 | 
| socketTimeout | 连接器连接Tablestore的Socket超时时间。 | Integer | 否 | 30000 | 单位为毫秒。 | 
| ioThreadCount | IO线程数量。 | Integer | 否 | 4 | 无。 | 
| callbackThreadPoolSize | 回调线程池大小。 | Integer | 否 | 4 | 无。 | 
源表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| tunnelName | 表格存储数据表的数据通道名称。 | String | 是 | 无 | 您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见操作步骤。 | 
| ignoreDelete | 是否忽略DELETE操作类型的实时数据。 | Boolean | 否 | false | 参数取值如下: 
 | 
| skipInvalidData | 是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。 | Boolean | 否 | false | 参数取值如下: 
 说明  仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 | 
| retryStrategy | 重试策略。 | Enum | 否 | TIME | 参数取值如下: 
 | 
| retryCount | 重试次数。 | Integer | 否 | 3 | 当retryStrategy设置为COUNT时,可以设置重试次数。 | 
| retryTimeoutMs | 重试的超时时间。 | Integer | 否 | 180000 | 当retryStrategy设置为TIME时,可以设置重试的超时时间,单位为毫秒。 | 
| streamOriginColumnMapping | 原始列名到真实列名的映射。 | String | 否 | 无 | 原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如 | 
| outputSpecificRowType | 是否透传具体的RowType。 | Boolean | 否 | false | 参数取值如下: 
 | 
| dataFetchTimeoutMs | 读取表的分区的最大时间开销。 | Integer | 否 | 10000 | 单位为毫秒。当表的分区数量较多且同步任务对延迟较为敏感时,可以适当调小此参数,以降低同步延迟。 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | 
| enableRequestCompression | 是否开启数据压缩。 | Boolean | 否 | false | 在读取数据时是否启用数据压缩功能。启用后将节省带宽,但可能会增加CPU负载。 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | 
结果表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| retryIntervalMs | 重试间隔时间。 | Integer | 否 | 1000 | 单位为毫秒。 | 
| maxRetryTimes | 最大重试次数。 | Integer | 否 | 10 | 无。 | 
| valueColumns | 插入字段的列名。 | String | 是 | 无 | 多个字段以半角逗号(,)分割,例如ID或NAME。 | 
| bufferSize | 流入多少条数据后开始输出。 | Integer | 否 | 5000 | 无。 | 
| batchWriteTimeoutMs | 写入超时的时间。 | Integer | 否 | 5000 | 单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 | 
| batchSize | 一次批量写入的条数。 | Integer | 否 | 100 | 最大值为200。 | 
| ignoreDelete | 是否忽略DELETE操作。 | Boolean | 否 | False | 无。 | 
| autoIncrementKey | 当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。 | String | 否 | 无 | 当结果表没有主键自增列时,请不要设置此参数。 说明  仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 | 
| overwriteMode | 数据覆盖模式。 | Enum | 否 | PUT | 参数取值如下: 
 说明  动态列模式下只支持UPDATE模式。 | 
| defaultTimestampInMillisecond | 设定写入Tablestrore数据的默认时间戳。 | Long | 否 | -1 | 如果不指定,则会使用系统当前的毫秒时间戳。 | 
| dynamicColumnSink | 是否开启动态列模式。 | Boolean | 否 | false | 动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。 说明  开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。 | 
| checkSinkTableMeta | 是否检查结果表元数据。 | Boolean | 否 | true | 若设置为true,会检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。 | 
| enableRequestCompression | 数据写入过程中是否开启数据压缩。 | Boolean | 否 | false | 无。 | 
| maxColumnsCount | 写入下游表的最大列数。 | Integer | 否 | 128 | 如果写入的列数超过128,则可能会出现错误: 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | 
| storageType | 写入结果表的类型。 | String | 否 | WIDE_COLUMN | 参数取值如下: 
 说明  仅实时计算引擎VVR 8.0.10及以上版本支持。 | 
维表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| retryIntervalMs | 重试间隔时间。 | Integer | 否 | 1000 | 单位为毫秒。 | 
| maxRetryTimes | 最大重试次数。 | Integer | 否 | 10 | 无。 | 
| cache | 缓存策略。 | String | 否 | ALL | 目前Tablestore维表支持以下三种缓存策略: 
 | 
| cacheSize | 缓存大小。 | Integer | 否 | 无 | 当缓存策略选择LRU时,可以设置缓存大小。 说明  单位为数据条数。 | 
| cacheTTLMs | 缓存失效时间。 | Integer | 否 | 无 | 单位为毫秒。cacheTTLMs配置和cache有关: 
 | 
| cacheEmpty | 是否缓存空结果。 | Boolean | 否 | 无 | 
 | 
| cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | String | 否 | 无 | 格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示: 
 | 
| async | 是否异步返回数据。 | Boolean | 否 | false | 
 | 
类型映射
源表
| Tablestore字段类型 | Flink字段类型 | 
| INTEGER | BIGINT | 
| STRING | STRING | 
| BOOLEAN | BOOLEAN | 
| DOUBLE | DOUBLE | 
| BINARY | BINARY | 
结果表
| Flink字段类型 | Tablestore字段类型 | 
| BINARY | BINARY | 
| VARBINARY | |
| CHAR | STRING | 
| VARCHAR | |
| TINYINT | INTEGER | 
| SMALLINT | |
| INTEGER | |
| BIGINT | |
| FLOAT | DOUBLE | 
| DOUBLE | |
| BOOLEAN | BOOLEAN | 
使用示例
示例1
从OTS源表向OTS结果表进行数据写入。
CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);
CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;示例2
宽表模型同步到时序模型。
CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_ots_timeseries_2',
    'tunnelName' = 'timeseries_source_tunnel_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'ignoreDelete' = 'true', //是否忽略delete操作的数据
);
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_timeseries_sink_table_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'storageType' = 'TIMESERIES',
);
--将源表数据插入到结果表。
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;