表格存储Tablestore(OTS)

本文为您介绍如何使用表格存储Tablestore(OTS)连接器。

背景信息

表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore

Tablestore连接器支持的信息如下。

类别

详情

运行模式

流模式

API种类

SQL

支持类型

源表、维表和结果表

数据格式

暂不支持

特有监控指标

  • 源表:无

  • 维表:无

  • 结果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

是否支持更新或删除结果表数据

前提条件

已购买Tablestore实例并创建表,详情请参见使用流程

语法结构

结果表

CREATE TABLE ots_sink (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  primary key(name,age) not enforced
) WITH (
  'connector'='ots',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'endPoint'='<yourEndpoint>',
  'valueColumns'='birthday'
);
说明

Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。

维表

CREATE TABLE ots_dim (
  id int,
  len int,
  content STRING
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}'
);

源表

CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false'
);

属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordTypeOtsRecordTimestamp两个字段。字段说明请参见下表。

字段名

Flink映射名

描述

OtsRecordType

type

数据操作类型。

OtsRecordTimestamp

timestamp

数据操作时间,单位为微秒。

说明

全量读取数据时,OtsRecordTimestamp取值为0。

当需要读取OtsRecordTypeOtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。

CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  record_type STRING METADATA FROM 'type',
  record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
  ...
);

WITH参数

通用

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

固定值为ots

instanceName

实例名。

String

无。

endPoint

实例访问地址。

String

请参见服务地址

tableName

表名。

String

无。

accessId

阿里云账号或者RAM用户的AccessKey ID。

String

详情请参见如何查看AccessKey IDAccessKey Secret信息?

重要

为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量

accessKey

阿里云账号或者RAM用户的AccessKey Secret。

String

connectTimeout

连接器连接Tablestore的超时时间。

Integer

30000

单位为毫秒。

socketTimeout

连接器连接TablestoreSocket超时时间。

Integer

30000

单位为毫秒。

ioThreadCount

IO线程数量。

Integer

4

无。

callbackThreadPoolSize

回调线程池大小。

Integer

4

无。

源表独有

参数

说明

数据类型

是否必填

默认值

备注

tunnelName

表格存储数据表的数据通道名称。

String

您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见操作步骤

ignoreDelete

是否忽略DELETE操作类型的实时数据。

Boolean

false

参数取值如下:

  • true:忽略。

  • false(默认值):不忽略。

skipInvalidData

是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。

Boolean

false

参数取值如下:

  • true:忽略脏数据。

  • false(默认值):不忽略脏数据。

说明

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

retryStrategy

重试策略。

Enum

TIME

参数取值如下:

  • TIME:在超时时间retryTimeoutMs内持续进行重试。

  • COUNT:在最大重试次数retryCount内持续进行重试。

retryCount

重试次数。

Integer

3

retryStrategy设置为COUNT时,可以设置重试次数。

retryTimeoutMs

重试的超时时间。

Integer

180000

retryStrategy设置为TIME时,可以设置重试的超时时间,单位为毫秒。

streamOriginColumnMapping

原始列名到真实列名的映射。

String

原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如origin_col1:col1,origin_col2:col2

outputSpecificRowType

是否透传具体的RowType。

Boolean

false

参数取值如下:

  • false:不透传,所有数据RowType均为INSERT。

  • true:透传,将根据透传的类型相应设置为INSERT、DELETEUPDATE_AFTER。

dataFetchTimeoutMs

读取表的分区的最大时间开销。

Integer

10000

单位为毫秒。当表的分区数量较多且同步任务对延迟较为敏感时,可以适当调小此参数,以降低同步延迟。

说明

仅实时计算引擎VVR 8.0.10及以上版本支持。

enableRequestCompression

是否开启数据压缩。

Boolean

false

在读取数据时是否启用数据压缩功能。启用后将节省带宽,但可能会增加CPU负载。

说明

仅实时计算引擎VVR 8.0.10及以上版本支持。

结果表独有

参数

说明

数据类型

是否必填

默认值

备注

retryIntervalMs

重试间隔时间。

Integer

1000

单位为毫秒。

maxRetryTimes

最大重试次数。

Integer

10

无。

valueColumns

插入字段的列名。

String

多个字段以半角逗号(,)分割,例如IDNAME。

bufferSize

流入多少条数据后开始输出。

Integer

5000

无。

batchWriteTimeoutMs

写入超时的时间。

Integer

5000

单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

batchSize

一次批量写入的条数。

Integer

100

最大值为200。

ignoreDelete

是否忽略DELETE操作。

Boolean

False

无。

autoIncrementKey

当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。

String

当结果表没有主键自增列时,请不要设置此参数。

说明

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

overwriteMode

数据覆盖模式。

Enum

PUT

参数取值如下:

  • PUT:以PUT方式将数据写入到Tablestore表。

  • UPDATE:以UPDATE方式写入到Tablestore表。

说明

动态列模式下只支持UPDATE模式。

defaultTimestampInMillisecond

设定写入Tablestrore数据的默认时间戳。

Long

-1

如果不指定,则会使用系统当前的毫秒时间戳。

dynamicColumnSink

是否开启动态列模式。

Boolean

false

动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。

说明

开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。

checkSinkTableMeta

是否检查结果表元数据。

Boolean

true

若设置为true,会检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。

enableRequestCompression

数据写入过程中是否开启数据压缩。

Boolean

false

无。

maxColumnsCount

写入下游表的最大列数。

Integer

128

如果写入的列数超过128,则可能会出现错误:The count of attribute columns exceeds the maximum,此时需要调整该参数。

说明

仅实时计算引擎VVR 8.0.10及以上版本支持。

storageType

写入结果表的类型。

String

WIDE_COLUMN

参数取值如下:

  • WIDE_COLUMN:结果表是宽表。

  • TIMESERIES:结果表是时序表。

说明

仅实时计算引擎VVR 8.0.10及以上版本支持。

维表独有

参数

说明

数据类型

是否必填

默认值

备注

retryIntervalMs

重试间隔时间。

Integer

1000

单位为毫秒。

maxRetryTimes

最大重试次数。

Integer

10

无。

cache

缓存策略。

String

ALL

目前Tablestore维表支持以下三种缓存策略:

  • None:无缓存。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList

    说明

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

cacheSize

缓存大小。

Integer

当缓存策略选择LRU时,可以设置缓存大小。

说明

单位为数据条数。

cacheTTLMs

缓存失效时间。

Integer

单位为毫秒。cacheTTLMs配置和cache有关:

  • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

  • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

  • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

cacheEmpty

是否缓存空结果。

Boolean

  • true:缓存

  • false:不缓存

cacheReloadTimeBlackList

更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

String

格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

  • 用半角逗号(,)来分隔多个黑名单。

  • 用箭头(->)来分割黑名单的起始结束时间。

async

是否异步返回数据。

Boolean

false

  • true:表示异步返回数据。异步返回数据默认是无序的。

  • false(默认值):表示不进行异步返回数据。

类型映射

源表

Tablestore字段类型

Flink字段类型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

结果表

Flink字段类型

Tablestore字段类型

BINARY

BINARY

VARBINARY

CHAR

STRING

VARCHAR

TINYINT

INTEGER

SMALLINT

INTEGER

BIGINT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

使用示例

示例1

OTS源表向OTS结果表进行数据写入。

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

示例2

宽表模型同步到时序模型。

CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_ots_timeseries_2',
    'tunnelName' = 'timeseries_source_tunnel_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'ignoreDelete' = 'true', //是否忽略delete操作的数据
);
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_timeseries_sink_table_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'storageType' = 'TIMESERIES',
);

--将源表数据插入到结果表。
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;