使用教程(宽表模型)

表格存储支持作为实时计算Flink的源表和结果表使用,您可以将表格存储数据表中的数据经过Flink处理后得到的结果保存到表格存储的另一张数据表中。

背景信息

实时计算Flink能将Tunnel Service的数据通道作为流式数据的输入,每条数据类似一个JSON格式。示例如下:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

字段名

描述

OtsRecordType

数据操作类型,取值范围如下:

  • PUT:新增数据操作。

  • UPDATE:更新数据操作。

  • DELETE:删除数据操作。

OtsRecordTimestamp

数据操作时间,单位为微秒。全量数据时取值为0。

PrimaryKey

主键列信息,以JSON格式数组表示。支持配置1~4列,请以实际主键列为准。包括如下选项:

  • ColumnName:列名称。

  • Value:列值。

Columns

属性列信息,以JSON格式的数组表示。包括如下选项:

  • OtsColumnType:列操作类型。取值范围为PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION。

  • ColumnName:列名。

  • Value:列值。

    当设置OtsColumnTypeDELETE_ONE_VERSION或者DELETE_ALL_VERSION时,不需要配置该参数。

Tablestore数据源表

存储在Tablestore中数据的主键和属性列值均可以在Flink中通过数据源表DDL以列名与相应的类型映射进行读取。更多信息,请参见表格存储Tablestore连接器

DDL定义

数据源表的DDL定义示例如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据.
);

除了待消费的用户数据外,Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp字段均支持通过属性字段的方式读取。字段说明请参见下表。

字段名

Flink映射名

描述

OtsRecordType

type

数据操作类型。

OtsRecordTimestamp

timestamp

数据操作时间,单位为微秒。全量数据时取值为0。

当需要读取OtsRecordTypeOtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

WITH参数

参数

是否必填

描述

connector

源表类型。固定取值为ots。

endPoint

表格存储实例的服务地址。更多信息,请参见服务地址

instanceName

表格存储的实例名称。

tableName

表格存储的数据表名称。

tunnelName

表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道

accessId

阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey

accessKey

ignoreDelete

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

skipInvalidData

是否忽略脏数据。默认值为false,表示不忽略脏数据。

如果不忽略脏数据,则处理脏数据时会进行报错。如果需要忽略脏数据,请设置此参数为true。

源表字段类型映射

Tablestore字段类型

Flink字段类型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tablestore数据结果表

Flink支持使用Tablestore存储输出结果。更多信息,请参见表格存储Tablestore连接器

DDL定义

结果表的DDL定义示例如下:

说明

Tablestore数据结果表定义中除了主键列外,需要包含至少一个属性列。

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

WITH参数

参数

是否必填

描述

connector

结果表类型。固定取值为ots。

endPoint

表格存储实例的服务地址。更多信息,请参见服务地址

instanceName

表格存储的实例名称。

tableName

表格存储的数据表名称。

tunnelName

表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道

accessId

阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey

accessKey

valueColumns

指定插入的字段列名。插入多个字段以半角逗号(,)分隔。例如ID,NAME

bufferSize

流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。

batchWriteTimeoutMs

写入超时的时间。单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

batchSize

一次批量写入的条数。默认值为100。

retryIntervalMs

重试间隔时间,单位毫秒。默认值为1000。

maxRetryTimes

最大重试次数。默认值为100。

ignoreDelete

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

autoIncrementKey

当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。

defaultTimestampInMillisecond

写入结果表的数据的版本号,单位为毫秒。当不进行配置时,版本号取决于写入的时间。

结果表字段类型映射

Flink字段类型

Tablestore字段类型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL示例

读取数据源表的数据

批量从数据源表ots source中读取数据,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
SELECT * FROM tablestore_stream LIMIT 100;

数据同步到结果表

ots sink数据会以updateRow的方式写入结果表,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

实时计算作业开发流程

前提条件

  • 已创建AccessKey。具体操作,请参见创建AccessKey

  • 已为表格存储数据表(源表)创建数据通道。具体操作,请参见创建数据通道

步骤一:创建作业

  1. 登录实时计算控制台

  2. Flink全托管页签,单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击SQL开发

  4. 单击新建

  5. 新建作业草稿对话框中,单击空白的流作业草稿

    Flink全托管也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

  6. 单击下一步

  7. 填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.4-flink-1.15

  8. 单击创建

步骤二:编写作业代码

  1. 创建一个源表和结果表的临时表。

    说明

    在生产作业中,建议您尽量减少临时表的使用,直接使用元数据管理中已经注册的表。

    创建一个tablestore_streamots_sink临时表代码示例如下:

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' //是否忽略delete操作的数据。
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. 编写作业逻辑。

    将源表数据插入到结果表的代码示例如下:

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步骤三:进行更多配置

在作业开发页面右侧,单击更多配置后,您可以填写以下参数信息:

  • 引擎版本:修改您创建作业时选择的Flink引擎版本。

    说明 VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:
    • Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
    • Flink 1.11Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12vvr-4.0.7-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
  • 附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。

步骤四:进行深度检查

在作业开发页面顶部,单击深度检查,进行语法检查。

步骤五:(可选)进行作业调试

在作业开发页面顶部,单击调试

您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECTINSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。具体操作,请参见作业调试

步骤六:作业部署

在作业开发页面顶部,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确认部署

说明

Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式部署或调试作业,提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题。具体操作,请参见步骤一:创建Session集群

步骤七:启动并查看Flink计算结果

说明

如果您对作业进行了修改(例如更改SQL代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要先上线,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。关于作业停止的具体操作,请参见作业停止

  1. 在左侧导航栏,单击作业运维

  2. 单击目标作业名称操作列中的启动

    作业启动参数配置详情请参见作业启动。单击启动后,您可以看到作业状态变为运行中,则代表作业运行正常。

  3. 在作业运维详情页面,查看Flink计算结果。

    1. 作业运维页面,单击目标作业名称。

    2. 单击作业探查

    3. 运行日志页签,单击运行Task Managers页签下的Path, ID

    4. 单击日志,在页面搜索Sink相关的日志信息。

      image..png