使用教程

本文为您介绍如何使用Flink计算表格存储(Tablestore)的数据,表格存储中的数据表或时序表均可作为实时计算Flink的源表或结果表进行使用。

前提条件

实时计算作业开发流程

步骤一:创建作业

  1. 进入SQL作业创建页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL

  2. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步

    说明

    Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

  3. 填写作业信息

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本,引擎版本详情请参见功能发布记录引擎版本介绍

    vvr-8.0.10-flink-1.17

  4. 单击创建

步骤二:编写SQL作业

说明

此处以将数据表中的数据同步至另一个数据表为例,为您介绍如何编写SQL作业。更多SQL示例,请参考SQL示例

  1. 分别创建源表(数据表)和结果表(数据表)的临时表。

    详细配置信息,请参见附录1:Tablestore连接器

    -- 创建源表(数据表)的临时表 tablestore_stream
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- 源表的连接器类型。固定取值为ots。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'flink_source_table', -- 表格存储的源表名称。
        'tunnelName' = 'flink_source_tunnel', -- 表格存储源表的数据通道名称。
        'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
        'ignoreDelete' = 'false' -- 是否忽略DELETE操作类型的实时数据:不忽略。
    );
    
    -- 创建结果表(数据表)的临时表 tablestore_sink
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主键。
    ) WITH (
        'connector' = 'ots', -- 结果表的连接器类型。固定取值为ots。
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'flink_sink_table', -- 表格存储的结果表名称。
        'accessId' = 'xxxxxxxxxxx',  -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
        'valueColumns'='customerid,customername' --插入字段的列名。
    );
  2. 编写作业逻辑。

    将源表数据插入到结果表的代码示例如下:

    --将源表数据插入到结果表
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步骤三:(可选)查看配置信息

SQL编辑区域右侧页签,您可以查看或上传相关配置。

页签名称

配置说明

更多配置

  • 引擎版本:当前作业使用的Flink的引擎版本。

  • 附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。

    您可以下载VVR依赖,并在资源文件页签进行上传,然后选择附加依赖文件为上传的VVR依赖即可。具体操作,请参见附录2:配置VVR依赖

代码结构

  • 数据流向图:您可以通过数据流向图快速查看数据的流向。

  • 树状结构图:您可以通过树状结构图快速查看数据的来源。

版本信息

您可以在此处查看作业版本信息,操作列下的功能详情请参见管理作业版本

步骤四:(可选)进行深度检查

深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。

  1. SQL编辑区域右上方,单击深度检查

  2. 深度检查对话框,单击确认

步骤五:(可选)进行作业调试

您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECTINSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。

  1. SQL编辑区域右上方,单击调试

  2. 调试对话框,选择调试集群后,单击下一步

    如果没有可用集群则需要创建新的Session集群,Session集群与SQL作业引擎版本需要保持一致并处于运行中。详情请参见创建Session集群

  3. 配置调试数据。

    • 如果您使用线上数据,无需处理。

    • 如果您需要使用调试数据,需要先单击下载调试数据模板,填写调试数据后,上传调试数据。详情请参见作业调试

  4. 确定好调试数据后,单击确定

步骤六:进行作业部署

SQL编辑区域右上方,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定

说明

Session集群适用于非生产环境的开发测试环境,通过部署或调试作业提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将生产作业提交至Session集群中,可能会导致业务稳定性问题。

步骤七:启动并查看Flink计算结果

  1. 在左侧导航栏,单击运维中心 > 作业运维

  2. 单击目标作业操作列中的启动

    选择无状态启动后,单击启动。当作业状态转变为运行中时,代表作业运行正常。作业启动参数配置,详情请参见作业启动

    说明
    • Flink中的每个TaskManager建议配置2CPU4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。

    • source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。

  3. 在作业运维详情页面,查看Flink计算结果。

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 作业日志页签,单击运行Task Managers页签下Path,ID列的目标任务。

    3. 单击日志,在页面查看相关的日志信息。

  4. (可选)停止作业。

    如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止

附录

附录1:Tablestore连接器

实时计算Flink版内置了表格存储Tablestore连接器,用于Tablestore的数据读写与同步。

源表

DDL定义
数据表

数据表作为源表的DDL定义示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
时序表

时序表作为源表的DDL定义示例如下:

-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordTypeOtsRecordTimestamp两个字段。字段说明请参见下表。

字段名

Flink映射名

描述

OtsRecordType

type

数据操作类型。

OtsRecordTimestamp

timestamp

数据操作时间,单位为微秒。

说明

全量读取数据时,取值为0。

WITH参数

参数

适用表

是否必填

描述

connector

通用参数

源表的连接器类型。固定取值为ots。

endPoint

通用参数

表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址

instanceName

通用参数

表格存储的实例名称。

tableName

通用参数

表格存储的源表名称。

tunnelName

通用参数

表格存储源表的通道名称。关于创建通道的具体操作,请参见创建数据通道

accessId

通用参数

阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey IDAccessKey Secret,详情请参见变量管理

accessKey

通用参数

connectTimeout

通用参数

连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。

socketTimeout

通用参数

连接器连接TablestoreSocket超时时间,单位为毫秒。默认值为30000。

ioThreadCount

通用参数

IO线程数量。默认值为4。

callbackThreadPoolSize

通用参数

回调线程池大小。默认值为4。

ignoreDelete

数据表

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

skipInvalidData

通用参数

是否忽略脏数据。默认值为false,表示不忽略脏数据。如果不忽略脏数据,则处理脏数据时会报错。

重要

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

retryStrategy

通用参数

重试策略。参数取值如下:

  • TIME(默认值):在超时时间retryTimeoutMs内持续进行重试。

  • COUNT:在最大重试次数retryCount内持续进行重试。

retryCount

通用参数

重试次数。当retryStrategy设置为COUNT时,可以设置重试次数。默认值为3。

retryTimeoutMs

通用参数

重试的超时时间,单位为毫秒。当retryStrategy设置为TIME时,可以设置重试的超时时间。默认值为180000。

streamOriginColumnMapping

通用参数

原始列名到真实列名的映射。

说明

原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如origin_col1:col1,origin_col2:col2

outputSpecificRowType

通用参数

是否透传具体的RowType。参数取值如下:

  • false(默认值):不透传,所有数据RowType均为INSERT。

  • true:透传,将根据透传的类型相应设置为INSERT、DELETEUPDATE_AFTER。

类型映射

Tablestore字段类型

Flink字段类型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

结果表

DDL定义
数据表

数据表作为结果表的DDL定义示例如下:

-- 创建结果表(数据表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
说明

Tablestore结果表必须定义主键(Primary Key)和至少一个属性列,输出数据以Update方式追加到Tablestore表。

时序表

时序模型结果表需要指定_m_name_data_source_tags_time四个主键,其余配置与数据表的结果表配置相同。目前支持WITH参数,SINK表主键和Map格式主键三种方式指定时序表主键。三种方式_tags列的转换优先级为WITH参数方式的优先级最高,Map格式主键与SINK表主键方式次之。

使用WITH参数方式

使用WITH参数方式定义DDL的示例如下。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
使用Map格式主键方式

使用Map格式主键方式定义DDL的示例如下。

说明

Tablestore引入了FlinkMap类型,以便于生成时序模型中时序表的_tags列,Map类型可以支持列的改名、简单函数等映射操作。使用Map时必须保证其中的_tags主键声明位置在第三位。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
使用SINK表主键方式

使用SINK表主键方式定义DDL的示例如下。主键定义中的第一位measurement为_m_name列,第二位datasource为_data_source列,最后一位timetime列,中间的多列为tag列。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
WITH参数

参数

适用表

是否必填

说明

connector

通用参数

结果表的连接器类型。固定取值为ots。

endPoint

通用参数

表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址

instanceName

通用参数

表格存储的实例名称。

tableName

通用参数

表格存储的时序表名称。

accessId

通用参数

阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey IDAccessKey Secret,详情请参见变量管理

accessKey

通用参数

valueColumns

数据表

插入字段的列名。多个字段以半角逗号(,)分割,例如ID,NAME

storageType

通用参数

重要

当时序表作为结果表时,必须配置为TIMESERIES。

数据存储类型。取值范围如下:

  • WIDE_COLUMN(默认值):数据表

  • TIMESERIES:时序表

timeseriesSchema

时序表

重要

当时序表作为结果表时,如果使用WITH参数的方式指定时序表主键,则必须配置该参数。

需要指定为时序表主键的列。

  • JSONkey-value格式来指定时序表主键,例如{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

  • 配置的主键类型必须与时序表中主键类型一致。其中tags主键可以支持同时包含多列。

connectTimeout

通用参数

连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。

socketTimeout

通用参数

连接器连接TablestoreSocket超时时间,单位为毫秒。默认值为30000。

ioThreadCount

通用参数

IO线程数量。默认值为4。

callbackThreadPoolSize

通用参数

回调线程池大小。默认值为4。

retryIntervalMs

通用参数

重试间隔时间,单位为毫秒。默认值为1000。

maxRetryTimes

通用参数

最大重试次数。默认值为10。

bufferSize

通用参数

流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。

batchWriteTimeoutMs

通用参数

写入超时的时间,单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

batchSize

通用参数

一次批量写入的条数。默认值为100,最大值为200。

ignoreDelete

通用参数

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

重要

仅数据表作为源表时可以根据需要配置。

autoIncrementKey

数据表

当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。当结果表没有主键自增列时,请不要设置此参数。

重要

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

overwriteMode

通用参数

数据覆盖模式。参数取值如下:

  • PUT(默认值):以PUT方式将数据写入到Tablestore表。

  • UPDATE:以UPDATE方式写入到Tablestore表。

说明

动态列模式下只支持UPDATE模式。

defaultTimestampInMillisecond

通用参数

设定写入Tablestore数据的默认时间戳。如果不指定,则使用系统当前的毫秒时间戳。

dynamicColumnSink

通用参数

是否开启动态列模式。默认值为false,表示不开启动态列模式。

说明
  • 动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。

  • 开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。

checkSinkTableMeta

通用参数

是否检查结果表元数据。默认值为true,表示检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。

enableRequestCompression

通用参数

数据写入过程中是否开启数据压缩。默认值为false,表示不开启数据压缩。

类型映射

Flink字段类型

Tablestore字段类型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL示例

同步源表数据到结果表
同步数据表数据到时序表

从源表(数据表)flink_source_table中读取数据,并将结果写入结果表(时序表)flink_sink_table。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 使用With参数方式创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
--将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
同步时序表数据到数据表

从源表(时序表)flink_source_table中读取数据,并将结果写入结果表(数据表)flink_sink_table。

SQL示例如下:

-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- 创建结果表(数据表)的临时表 print_table。
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

--将源表数据插入到结果表
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
读取源表数据并打印到控制台

批量从源表flink_source_table中读取数据,您可以使用作业调试功能模拟作业运行,调试结果将显示在SQL编辑器下方。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 从源表读取数据
SELECT * FROM tablestore_stream LIMIT 100;
读取源表数据并打印到TaskManager日志

从源表flink_source_table中读取数据,并通过Print连接器将结果打印到TaskManager日志中。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 创建结果表的临时表 print_table。
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- print连接器
  'logger' = 'true'        -- 控制台显示计算结果
);

-- 打印源表的字段
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

附录2:配置VVR依赖

  1. 下载VVR依赖

  2. 上传VVR依赖。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击文件管理

    4. 资源文件页签,单击上传资源,选择要上传的VVR依赖的JAR包。

  3. 在目标作业的SQL编辑区域右侧页签,单击更多配置。在附加依赖文件项,选择目标VVR依赖的JAR包。