文档

通过Flink SQL模式配置ETL任务

更新时间:

Flink SQL是ETL为了简化计算模型、降低使用门槛而设计的一套符合标准SQL语义的开发语言。相对于DAG模式(可视化拖拽方式),Flink SQL的功能更为强大,您可在Flink SQL的命令窗口中输入DAG模式暂不支持的语法。本文将介绍如何通过Flink SQL模式配置ETL任务。

背景信息

说明

ETL功能在公测阶段,支持免费体验。如在体验过程中遇到问题,请加钉钉群沟通(钉钉群号:32326646)。

  • 在配置ETL任务前,请您了解以下信息:

    • 输入/维表指ETL的源库。

    • 输出指经过ETL处理后写入的目标库。

  • 数据库传输服务DTS为数据同步过程提供了流式的ETL功能,您可以在源库和目标库之间添加各种转换组件,实现丰富的转换操作,并将处理后的数据实时写入目标库。例如将两张流表做JOIN操作后形成一张大表,写入目标库;或者给源表新增一个字段,并为该字段配置函数进行赋值,源表该字段经过赋值转换后写入目标库。

前提条件

  • 当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。

  • 当前源库支持MySQLPolarDB MySQLOraclePostgreSQLDB2 iSeries(AS/400)DB2 LUWDRDS(PolarDB-X 1.0)、PolarDB PostgreSQLMariaDBPolarDB OracleSQLServerPolarDB-X 2.0

  • 当前目标库支持MySQLPolarDB MySQLOracleAnalyticDB MySQL 3.0PolarDB PostgreSQLPostgreSQLDB2 LUWDB2 iSeries(AS/400)AnalyticDB PostgreSQLSQLServerMariaDBDRDS(PolarDB-X 1.0)、PolarDB OracleTablestore

  • 由于ETL功能暂不支持结构迁移,所以您需要根据转换条件在目标库侧完成对应表结构的创建。例如A表中包含字段1、字段2和字段3,B表中包含字段2、字段3和字段4,对两张表通过做JOIN操作后,需要输出字段2和字段3,则需要在目标库侧创建做JOIN操作后的C表,C表中包含字段2和字段3。

  • 由于ETL功能暂不支持全量数据同步,所以您只能对增量数据进行实时转换。

注意事项

  • 所有的源库和目标库属于同一地域。

  • 所有流表均来源于同一实例。

  • 数据库的库名和表名唯一。

  • 当前暂不支持配置跨账号的任务。

操作步骤

  1. 进入ETL任务的列表页面。

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,单击集成与开发(DTS)

    3. 在左侧导航栏,选择数据集成 > 流式ETL

    说明

    您也可以在DTS控制台的ETL页面,单击去DMS创建流式ETL

  2. 单击左上角的新增数据流,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式FlinkSQL
  3. 单击确认

  4. 数据加工页面的数据流信息部分,添加源库和目标库。
    参数说明
    地区选择数据源所在地域。
    类型选择库表类型。
    • 配置源表信息时,如源表为 流表 ,则需选择流表;如源表为 维表 ,则需选择维表
    • 配置目标表信息时,则需选择输出
    数据库类型选择源库或目标库的数据库类型。
    实例输入实例名称或实例ID,搜索并选择源和目标实例。
    重要 您需要先在DMS中录入源实例和目标实例。录入方式,请参见实例管理
    数据库选择数据加工对象所属的源库或目标库。
    物理表选择数据加工对象所属的源表或目标表。
    物理表别名为源表或目标表设置精简易读的别名,便于ETL在运行SQL语句时定位至具体的表。
  5. 数据加工页面的SQL命令窗口,添加用于配置ETL任务的SQL语句。
    本案例以如下SQL语句为例,配置ETL任务,将流表test_orders与维表product结合至目标表test_orders_new中。
    重要 SQL语句间需以英文分号(;)分割。
    CREATE TABLE `etltest_test_orders` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `dts_etl_schema_db_table` STRING,
      `dts_etl_db_log_time` BIGINT,
      `pt` AS PROCTIME(),
      WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
    ) WITH (
      'streamType'= 'append',
      'alias'= 'test_orders',
      'vertexType'= 'stream'
    );
    CREATE TABLE `etltest_product` (
      `product_id` BIGINT,
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'product',
      'vertexType'= 'lookup'
    );
    CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
    SELECT
      `etltest_test_orders`.`order_id` AS `order_id`,
      `etltest_test_orders`.`user_id` AS `user_id`,
      `etltest_test_orders`.`product_id` AS `product_id`,
      `etltest_test_orders`.`total_price` AS `total_price`,
      `etltest_test_orders`.`order_date` AS `order_date`,
      `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
      `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
      `etltest_product`.`product_id` AS `product_id_0001011101`,
      `etltest_product`.`product_name` AS `product_name`,
      `etltest_product`.`product_price` AS `product_price`
    FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
    ;
    CREATE TABLE `test_orders_new` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'test_orders_new',
      'vertexType'= 'sink'
    );
    INSERT INTO `test_orders_new` (
      `order_id`,
      `user_id`,
      `product_id`,
      `total_price`,
      `order_date`,
      `product_name`,
      `product_price`
    )
    SELECT
      `etltest_test_orders_JOIN_etltest_product`.`order_id`,
      `etltest_test_orders_JOIN_etltest_product`.`user_id`,
      `etltest_test_orders_JOIN_etltest_product`.`product_id`,
      `etltest_test_orders_JOIN_etltest_product`.`total_price`,
      `etltest_test_orders_JOIN_etltest_product`.`order_date`,
      `etltest_test_orders_JOIN_etltest_product`.`product_name`,
      `etltest_test_orders_JOIN_etltest_product`.`product_price`
    FROM `etltest_test_orders_JOIN_etltest_product`;
    类型说明
    源表和目标表信息
    • 您需使用CREATE TABLE语句定义源表和目标表信息。
    • SQL语句的WITH从句中可设置三个参数:streamType aliasvertexType 。其中流表必须设置以上三个参数,维表和输出仅需设置aliasvertexType
      • streamType 流类型。
        • Upsert:Upsert流。动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。
          说明 该编码方式要求动态表具有唯一键(可能是复合的)。
        • append: Append-only流。动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。
      • alias:在步骤3配置源库和目标库时设置的物理表别名
    • vertexType :表类型。
      • stream:流表。
      • lookup:维表。
      • sink:目标表。
    数据加工的计算逻辑您需使用CREATE VIEW语句描述数据加工的计算逻辑。
    加工后的目标表信息您需使用INSERT INTO语句定义加工后的目标表信息。
  6. 配置完成源库和目标库信息,以及SQL语句后,单击生成 Flink SQL校验
    说明
    • 您也可以单击发布,直接执行校验和预检查。
    • 如Flink SQL校验成功,您可单击ETL校验成功,查看Flink SQL校验详情。
    • 如Flink SQL校验失败,您可单击ETL校验成功,根据提示信息修复SQL语句,并重新进行生成Flink SQL校验。
  7. Flink SQL校验成功后,单击发布进入预检查阶段。
  8. 预检查通过率显示为100%时,单击下一步购买(免费)
    说明 如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。
  9. 购买页面,选择链路规格并确认计算资源(CU)(公测期间,固定为2)。阅读并勾选数据传输(按量付费)服务条款公测协议条款
    说明 ETL功能公测中,每个用户可以免费创建并使用两个ETL实例。
  10. 单击购买并启动,ETL任务正式开始。
  • 本页导读 (1)
文档反馈