通过Flink SQL模式配置ETL任务

Flink SQL是ETL为了简化计算模型、降低使用门槛而设计的一套符合标准SQL语义的开发语言。相对于DAG模式(可视化拖拽方式),Flink SQL的功能更为强大,您可在Flink SQL的命令窗口中输入DAG模式暂不支持的语法。本文将介绍如何通过Flink SQL模式配置ETL任务。

背景信息

说明

此功能即将下线,仅部分用户可以免费体验,未曾使用过该功能的用户已无法体验,建议您在同步或迁移实例中配置ETL任务。更多信息,请参见在DTS迁移或同步任务中配置ETL如在体验过程中遇到问题,请加钉钉群沟通(钉钉群号:32326646)。

  • 在配置ETL任务前,请您了解以下信息:

    • 输入/维表指ETL的源库。

    • 输出指经过ETL处理后写入的目标库。

  • 数据库传输服务DTS为数据同步过程提供了流式的ETL功能,您可以在源库和目标库之间添加各种转换组件,实现丰富的转换操作,并将处理后的数据实时写入目标库。例如将两张流表做JOIN操作后形成一张大表,写入目标库;或者给源表新增一个字段,并为该字段配置函数进行赋值,源表该字段经过赋值转换后写入目标库。

前提条件

  • 当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。

  • 当前源库支持MySQLPolarDB MySQLOraclePostgreSQLDB2 iSeries(AS/400)DB2 LUWDRDS(PolarDB-X 1.0)、PolarDB PostgreSQLMariaDBPolarDB OracleSQLServerPolarDB-X 2.0

  • 当前目标库支持MySQLPolarDB MySQLOracleAnalyticDB MySQL 3.0PolarDB PostgreSQLPostgreSQLDB2 LUWDB2 iSeries(AS/400)AnalyticDB PostgreSQLSQLServerMariaDBDRDS(PolarDB-X 1.0)、PolarDB OracleTablestore

  • 由于ETL功能暂不支持结构迁移,所以您需要根据转换条件在目标库侧完成对应表结构的创建。例如A表中包含字段1、字段2和字段3,B表中包含字段2、字段3和字段4,对两张表通过做JOIN操作后,需要输出字段2和字段3,则需要在目标库侧创建做JOIN操作后的C表,C表中包含字段2和字段3。

  • 由于ETL功能暂不支持全量数据同步,所以您只能对增量数据进行实时转换。

注意事项

  • 所有的源库和目标库属于同一地域。

  • 所有流表均来源于同一实例。

  • 数据库的库名和表名唯一。

  • 当前暂不支持配置跨账号的任务。

操作步骤

  1. 进入ETL任务的列表页面。

    1. 登录数据传输服务DTS控制台

    2. 在左侧导航栏,单击ETL

  2. 单击左上角的新增数据流,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式FlinkSQL

  3. 单击确认

  4. 流式ETL页面的数据流信息部分,添加源库和目标库。

    参数

    说明

    地区

    选择数据源所在地域。

    类型

    选择库表类型。

    • 配置源表信息时,如源表为流表(实时发生变化的表,可以关联一个维表,用于数据关联查询),则需选择流表;如源表为维表(更新不频繁或非实时更新的表,一般用于结合实时数据拼装成宽表进行数据分析),则需选择维表

    • 配置目标表信息时,则需选择输出

    数据库类型

    选择源库或目标库的数据库类型。

    实例

    输入实例名称或实例ID,搜索并选择源和目标实例。

    重要

    您需要先在DMS中录入源实例和目标实例。录入方式,请参见实例管理

    数据库

    选择数据加工对象所属的源库或目标库。

    物理表

    选择数据加工对象所属的源表或目标表。

    物理表别名

    为源表或目标表设置精简易读的别名,便于ETL在运行SQL语句时定位至具体的表。

  5. 流式ETL页面的SQL命令窗口,添加用于配置ETL任务的SQL语句。

    本案例以如下SQL语句为例,配置ETL任务,将流表test_orders与维表product结合至目标表test_orders_new中。

    重要

    SQL语句间需以英文分号(;)分割。

    CREATE TABLE `etltest_test_orders` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `dts_etl_schema_db_table` STRING,
      `dts_etl_db_log_time` BIGINT,
      `pt` AS PROCTIME(),
      WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
    ) WITH (
      'streamType'= 'append',
      'alias'= 'test_orders',
      'vertexType'= 'stream'
    );
    CREATE TABLE `etltest_product` (
      `product_id` BIGINT,
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'product',
      'vertexType'= 'lookup'
    );
    CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
    SELECT
      `etltest_test_orders`.`order_id` AS `order_id`,
      `etltest_test_orders`.`user_id` AS `user_id`,
      `etltest_test_orders`.`product_id` AS `product_id`,
      `etltest_test_orders`.`total_price` AS `total_price`,
      `etltest_test_orders`.`order_date` AS `order_date`,
      `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
      `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
      `etltest_product`.`product_id` AS `product_id_0001011101`,
      `etltest_product`.`product_name` AS `product_name`,
      `etltest_product`.`product_price` AS `product_price`
    FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
    ;
    CREATE TABLE `test_orders_new` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'test_orders_new',
      'vertexType'= 'sink'
    );
    INSERT INTO `test_orders_new` (
      `order_id`,
      `user_id`,
      `product_id`,
      `total_price`,
      `order_date`,
      `product_name`,
      `product_price`
    )
    SELECT
      `etltest_test_orders_JOIN_etltest_product`.`order_id`,
      `etltest_test_orders_JOIN_etltest_product`.`user_id`,
      `etltest_test_orders_JOIN_etltest_product`.`product_id`,
      `etltest_test_orders_JOIN_etltest_product`.`total_price`,
      `etltest_test_orders_JOIN_etltest_product`.`order_date`,
      `etltest_test_orders_JOIN_etltest_product`.`product_name`,
      `etltest_test_orders_JOIN_etltest_product`.`product_price`
    FROM `etltest_test_orders_JOIN_etltest_product`;

    类型

    说明

    源表和目标表信息

    • 您需使用CREATE TABLE语句定义源表和目标表信息。

    • SQL语句的WITH从句中可设置三个参数:streamType aliasvertexType 。其中流表必须设置以上三个参数,维表和输出仅需设置aliasvertexType

      • streamType :流类型。ETL在处理数据时会将流转换为动态表,在该动态表上进行持续查询(即动态表会被INSERT、UPDATE、DELETE操作持续更改),产生一个新的动态表。最终写入目标库时,再将新的动态表会转化为流。当新的动态表转化为流时,您需要指定本参数,对动态表前后更改信息进行编码。

        • Upsert:Upsert流。动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。

          说明

          该编码方式要求动态表具有唯一键(可能是复合的)。

        • append: Append-only流。动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。

      • alias:在步骤3配置源库和目标库时设置的物理表别名

    • vertexType :表类型。

      • stream:流表。

      • lookup:维表。

      • sink:目标表。

    数据加工的计算逻辑

    您需使用CREATE VIEW语句描述数据加工的计算逻辑。

    加工后的目标表信息

    您需使用INSERT INTO语句定义加工后的目标表信息。

  6. 配置完成源库和目标库信息,以及SQL语句后,单击生成 Flink SQL校验

    说明
    • 您也可以单击发布,直接执行校验和预检查。

    • 如Flink SQL校验成功,您可单击ETL校验成功,查看Flink SQL校验详情。

    • 如Flink SQL校验失败,您可单击ETL校验成功,根据提示信息修复SQL语句,并重新进行生成Flink SQL校验。

  7. Flink SQL校验成功后,单击发布进入预检查阶段。

  8. 预检查通过率显示为100%时,单击下一步购买(免费)

    说明

    如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。

  9. 购买页面,选择链路规格并确认计算资源(CU)(公测期间,固定为2)。阅读并勾选数据传输(按量付费)服务条款公测协议条款

    说明

    ETL功能公测中,每个用户可以免费创建并使用两个ETL实例。

  10. 单击购买并启动,ETL任务正式开始。