本文介绍如何使用实时计算Flink访问表格存储中的源表和结果表。

背景信息

实时计算Flink能将Tunnel Service的数据通道作为流式数据的输入,每条数据类似一个JSON格式。示例如下:
{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}
字段名 描述
OtsRecordType 数据操作类型,取值范围为PUT、UPDATE、DELETE。
OtsRecordTimestamp 数据操作时间,单位为微秒。全量数据时取值为0。
PrimaryKey 主键列信息,以JSON格式数组表示。支持配置1~4列,请以实际主键列为准。包括如下选项:
  • ColumnName:列名称。
  • Value:列值。
Columns 属性列信息,以JSON格式的数组表示。包括如下选项:
  • OtsColumnType:列操作类型。取值范围为PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION。
  • ColumnName:列名。
  • Value:列值。

    当设置OtsColumnType为DELETE_ONE_VERSION或者DELETE_ALL_VERSION时,没有Value字段。

Tablestore数据源表

存储在Tablestore中数据的主键和属性列值均可以在Flink中通过数据源表DDL以列名与相应的类型映射进行读取。更多信息,请参见表格存储Tablestore源表

DDL定义

数据源表的DDL定义示例如下:
CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据.
);

除了待消费的用户数据外,Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp字段均支持通过属性字段的方式读取。字段说明请参见下表。

字段名 Flink映射名 描述
OtsRecordType type 数据操作类型。
OtsRecordTimestamp timestamp 数据操作时间,单位为微秒。全量数据时取值为0。

当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

WITH参数

参数 是否必填 描述
connector 原表类型。固定取值为ots。
endPoint 表格存储实例的服务地址。更多信息,请参见服务地址
instanceName 表格存储的实例名称。
tableName 表格存储的数据表名称。
tunnelName 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道
accessId 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey
accessKey
ignoreDelete 是否忽略DELETE操作类型的实时数据,可选配置。默认值为false。

类型映射

Tablestore字段类型 Flink字段类型
INTEGER BIGINT
STRING STRING
BOOLEAN BOOLEAN
DOUBLE DOUBLE

Tablestore数据结果表

Flink支持使用Tablestore存储输出结果。更多信息,请参见表格存储Tablestore结果表

DDL定义

结果表的DDL定义示例如下:

说明 在Tablestore数据结果表定义中除了主键列外,需要包含至少一个属性列。
CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

WITH参数

参数 是否必填 描述
connector 结果表类型。固定取值为ots。
endPoint 表格存储实例的服务地址。更多信息,请参见服务地址
instanceName 表格存储的实例名称。
tableName 表格存储的数据表名称。
tunnelName 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道
accessId 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey
accessKey
valueColumns 指定插入的字段列名。插入多个字段以半角逗号(,)分隔。例如ID,NAME
bufferSize 流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。
batchWriteTimeoutMs 写入超时的时间。单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
batchSize 一次批量写入的条数。默认值为100。
retryIntervalMs 重试间隔时间,单位毫秒。默认值为1000。
maxRetryTimes 最大重试次数。默认值为100。
ignoreDelete 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

SQL示例

读取数据源表的数据

批量从数据源表ots source中读取数据,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
SELECT * FROM tablestore_stream LIMIT 100;

数据同步到结果表

ots sink数据会以updateRow的方式写入结果表,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

实时计算作业开发流程

  1. 创建SQL作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 示例 说明
      文件名称 flink-test 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业/SQL 流作业和批作业均支持以下文件类型:
      • SQL
      • JAR
      • PYTHON
      说明 实时计算引擎VVR 3.0.1及以上版本支持批作业。
      部署目标 vvp-workload 选择作业需要部署的集群名称。Flink全托管支持Per-Job集群和Session集群两种集群模式。两种集群模式的区别说明,请参见配置开发测试环境(Session集群)
      存储位置 作业开发 指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 编辑作业并上线。
    1. 将作业代码的SQL示例拷贝到作业文本编辑区。关于SQL示例的更多信息,请参见SQL示例
      说明 如果需要配置作业的部署环境,您可以在右侧单击高级配置页签,选择部署目标引擎版本
      CREATE TEMPORARY TABLE tablestore_stream(
          `order` VARCHAR,
          orderid VARCHAR,
          customerid VARCHAR,
          customername VARCHAR
      ) WITH (
          'connector'='ots',
          'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
          'instanceName' = 'flink-source',
          'tableName' ='flink_source_table',
          'tunnelName' = 'flinksourcestream',
          'accessId' ='xxxxxxxxxxx',
          'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
          'ignoreDelete' = 'false' //是否忽略delete操作的数据。
      );
      SELECT * FROM tablestore_stream LIMIT 100;
      fig_20220808_flink
    2. 单击验证,进行语法检查。
    3. 验证通过后,单击上线
      作业上线后,您可以在作业运维页面管理作业以及查看作业运行相关信息。
  3. 启动作业。
    1. 在左侧导航栏中,单击作业运维
    2. 作业运维页面,单击目标作业操作列的启动
    3. 单击确认启动
      作业启动后,您可以看到作业从当前状态到期望状态的变化过程及最终结果。直到状态变为RUNNING,则表示作业运行正常。

      单击作业名称,在TaskManager页签的日志中,您可以查看Flink计算结果。