文档

表格存储Tablestore(OTS)

更新时间:

本文为您介绍如何使用表格存储Tablestore(OTS)连接器。

背景信息

表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore

Tablestore连接器支持的信息如下。

类别

详情

运行模式

流模式

API种类

SQL

支持类型

源表、维表和结果表

数据格式

暂不支持

特有监控指标

  • 源表:无

  • 维表:无

  • 结果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

说明

指标的含义及如何查看监控指标,请参见自定义监控指标上报渠道

是否支持更新或删除结果表数据

前提条件

已购买Tablestore实例并创建表,详情请参见使用流程

使用限制

仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。

语法结构

  • 结果表

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}',
      'endPoint'='<yourEndpoint>',
      'valueColumns'='birthday'
    );
    说明

    Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。

  • 维表

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}'
    );
  • 源表

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>',
      'instanceName' = 'flink-source',
      'tableName' ='flink_source_table',
      'tunnelName' = 'flinksourcestream',
      'accessId' ='${ak_id}',
      'accessKey' ='${ak_secret}',
      'ignoreDelete' = 'false'
    );

    属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordTypeOtsRecordTimestamp两个字段。字段说明请参见下表。

    字段名

    Flink映射名

    描述

    OtsRecordType

    type

    数据操作类型。

    OtsRecordTimestamp

    timestamp

    数据操作时间,单位为微秒。

    说明

    全量读取数据时,OtsRecordTimestamp取值为0。

    当需要读取OtsRecordTypeOtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',
      record_timestamp BIGINT METADATA FROM 'timestamp'
    ) WITH (
      ...
    );

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为ots

    instanceName

    实例名。

    String

    无。

    endPoint

    实例访问地址。

    String

    请参见服务地址

    tableName

    表名。

    String

    无。

    accessId

    阿里云账号或者RAM用户的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理

    accessKey

    阿里云账号或者RAM用户的AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理

    retryIntervalMs

    重试间隔时间。

    Integer

    1000

    单位为毫秒。

    maxRetryTimes

    最大重试次数。

    Integer

    100

    无。

    connectTimeout

    连接器连接Tablestore的超时时间。

    Integer

    30000

    单位为毫秒。

    socketTimeout

    连接器连接Tablestore的Socket超时时间。

    Integer

    30000

    单位为毫秒。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    tunnelName

    表格存储数据表的数据通道名称。

    String

    您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道

    ignoreDelete

    是否忽略DELETE操作类型的实时数据。

    Boolean

    false

    参数取值如下:

    • true:忽略。

    • false(默认值):不忽略。

    skipInvalidData

    是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。

    Boolean

    false

    参数取值如下:

    • true:忽略脏数据。

    • false(默认值):不忽略脏数据。

    说明

    仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    valueColumns

    插入字段的列名。

    String

    多个字段以英文逗号(,)分割,例如ID或NAME。

    bufferSize

    流入多少条数据后开始输出。

    Integer

    5000

    无。

    batchWriteTimeoutMs

    写入超时的时间。

    Integer

    5000

    单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

    batchSize

    一次批量写入的条数。

    Integer

    100

    无。

    ignoreDelete

    是否忽略DELETE操作。

    Boolean

    False

    无。

    autoIncrementKey

    当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。

    String

    当结果表没有主键自增列时,请不要设置此参数。

    说明

    仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    cache

    缓存策略。

    String

    ALL

    目前Tablestore维表支持以下三种缓存策略:

    • None:无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

      需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList

      说明

      因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    缓存大小。

    Integer

    当缓存策略选择LRU时,可以设置缓存大小。

    cacheTTLMs

    缓存失效时间。

    Integer

    单位为毫秒。cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    cacheEmpty

    是否缓存空结果。

    Boolean

    • true:缓存

    • false:不缓存

    cacheReloadTimeBlackList

    更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

    String

    格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

    • 用英文逗号(,)来分隔多个黑名单。

    • 用箭头(->)来分割黑名单的起始结束时间。

    async

    是否异步返回数据。

    Boolean

    false

    • true:表示异步返回数据。异步返回数据默认是无序的,可通过asyncResultOrder参数进行配置。

    • false(默认值):表示不进行异步返回数据。

    asyncResultOrder

    异步返回数据时,结果是否需要保序。

    String

    unordered

    • unordered(默认值):表示异步返回数据无序。

    • ordered:表示异步返回数据有序。

    说明

    仅VVR 8.0.0及以上版本支持该参数。

类型映射

Tablestore字段类型

Flink字段类型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

使用示例

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
  • 本页导读 (1)
文档反馈