文档

通过任务编排实现跨库数据同步

某管理者希望能在购物平台上按商品类别查看每天的订单数量和金额。本文通过在任务编排中创建跨库Spark任务,实现了定期将在线库中的订单表和商品表同步到数据仓库中进行数据分析,并将分析结果回流在线库中供管理者查询。

前提条件

  • 准备一个MySQL数据库作为在线库,用于存放订单表和商品表,且您拥有该数据库的变更权限。如需申请权限,请参见访问控制权限概述

  • 准备一个AnalyticDB for MySQL数据仓库,用于数据加工,且您拥有该数据仓库的变更权限。如需申请权限,请参见访问控制权限概述

背景信息

购物平台会产生大量的数据,对这些数据进行分析时,如果直接在在线库进行分析,轻则会使在线库响应变慢,重则会导致在线库无法响应业务处理。因此,一般在对在线业务进行分析时,需要把在线业务的数据同步到专用于数据加工的数据仓库再进行数据处理。在数据仓库中将数据加工后,还需要把加工结果同步回在线库中,以便于在线应用提供相关的数据分析和统计服务。

操作步骤

  1. 准备工作

  2. 新增跨库Spark任务

  3. 配置跨库Spark任务

  4. 发布跨库Spark任务

准备工作

  1. 在MySQL数据库中创建订单表、商品表和统计表。

    1. 登录数据管理DMS 5.0
    2. 单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > SQL窗口 > SQL窗口

      说明

      若您使用的是非极简模式的控制台,在顶部菜单栏中,选择SQL窗口 > SQL窗口

    3. 请先选择数据库弹框中,搜索并选择MySQL数据库,单击确认

    4. 在MySQL数据库中新建订单表t_order、商品表t_product和用于同步数据分析结果的统计表t_order_report_daily。

      1. 创建表名为t_order的订单表。将下列建表SQL语句粘贴到SQL书写区域,单击执行

        创建订单表t_order的SQL语句:

        CREATE TABLE `t_order` (
          `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
          `product_id` bigint(20) NOT NULL COMMENT '产品id',
          `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
          `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
          `customer_id` bigint(20) NOT NULL COMMENT '客户id',
          `price` decimal(14,2) NOT NULL COMMENT '价格',
          `status` varchar(64) NOT NULL COMMENT '订单状态',
          `province` varchar(256) DEFAULT NULL COMMENT '交易省份',
          PRIMARY KEY (`id`),
          KEY `idx_product_id` (`product_id`),
          KEY `idx_customer_id` (`customer_id`),
          KEY `idx_status` (`status`)
        ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='订单表'
        ;

        订单表t_order的表结构如下图所示:

        1任务编排-订单表表结构

      2. 创建表名为t_product的商品表。将下列建表SQL语句粘贴到SQL书写区域,单击执行

        创建商品表t_product的SQL语句:

        CREATE TABLE `t_product` (
          `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
          `name` varchar(128) NOT NULL COMMENT '产品名',
          `type` varchar(64) NOT NULL COMMENT '产品类别',
          `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
          `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
          PRIMARY KEY (`id`)
        ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='产品表'
        ;

        商品表t_product的表结构如下图所示:

        商品表

      3. 创建表名为t_order_report_daily的统计表。将下列建表SQL语句粘贴到SQL书写区域,单击执行

        创建统计表t_order_report_daily的SQL语句:

        CREATE TABLE `t_order_report_daily` (
          `dt` varchar(64) NOT NULL COMMENT '统计日期',
          `product_type` varchar(64) NOT NULL COMMENT '产品类型',
          `order_cnt` bigint(64) NOT NULL COMMENT '笔数',
          `order_amt` decimal(38,2) NOT NULL  COMMENT '金额',
          PRIMARY KEY (`dt`)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='统计表'
        ;

        统计表t_order_report_daily的表结构如下图所示:

        1任务编排-统计表表结构

  2. AnalyticDB for MySQL数据仓库中创建接收数据的订单表、商品表和存放分析结果的统计表。

    1. 进入SQL窗口页面,在请先选择数据库弹框中,搜索并选择AnalyticDB for MySQL数据仓库,单击确认

    2. AnalyticDB for MySQL数据仓库中新建用于同步数据的订单表t_order、商品表t_product和用于存放数据分析结果的统计表t_order_report_daily。

      说明

      订单表t_order、商品表t_product和统计表t_order_report_daily的建表SQL语句,请参见步骤4

    3. 订单表t_order和商品表t_product中写入测试数据。使用测试数据构建功能生成数据,具体请参见测试数据构建

      • 构建订单表t_order的数据,数据量为2000万行。

        说明

        自由操作模式的实例,每个工单最大生成行数为100万行。

      • 构建商品表t_product的数据,数据量为1万行。

新增跨库Spark任务

  1. 登录数据管理DMS 5.0
  2. 在顶部菜单栏中,选择集成与开发(DTS) > 数据开发 > 任务编排

    说明

    若您使用的是极简模式的控制台,请单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 集成与开发(DTS) > 数据开发 > 任务编排

  3. 新增任务流。

    1. 单击新增任务流

    2. 新建任务流对话框中,输入任务流名称描述,单击确认

  4. 在画布左侧任务类型列表中,拖拽跨库Spark SQL节点到画布空白区域。

配置跨库Spark任务

  1. 在任务流详情页面,选中跨库Spark SQL节点。

  2. 配置当前日期变量${today}。关于变量的详细介绍,请参见变量

    1. 单击变量设置页签。

    2. 单击节点变量页签。

    3. 输入节点变量的参数,如下图所示。

      image

  3. 添加商品订单所在的MySQL数据库。

    1. 数据库引用区域,单击添加数据库引用

    2. 选择数据库类型,搜索并选择数据库,输入数据库在Spark SQL语句中的引用别名为demo_id。关于配置项的详细介绍,请参见数据库配置表

    3. 单击保存

  4. 添加用于数据加工的AnalyticDB for MySQL数据仓库。

    1. 单击demo_id数据库右侧的5加6,添加一个新的数据库引用。

    2. 选择数据库类型,搜索并选择数据库,输入数据库在Spark SQL语句中的引用别名为company。

    3. 单击保存

    image

  5. 在SQL区域,编写Spark SQL语句,并单击保存

    /*同步数据*/
    /*全量写商品表*/
    insert overwrite company.t_product
    select id,
           name,
           type,
           gmt_create,
           gmt_modified
      from demo_id.t_product ;
    
    /*增量写订单表*/
    insert into company.t_order
    select id,
           product_id,
           gmt_create,
           gmt_modified,
           customer_id,
           price,
           status,
           province
      from demo_id.t_order
     where gmt_create>= '${bizdate}'
       and gmt_create< '${today}' ;
    
    /*加工数据,按商品类别分组统计*/
    insert into company.t_order_report_daily
    select '${bizdate}',
           p.type as product_type,
           count(*)  order_cnt,
           sum(price)  order_amt
      from company.t_product p join company.t_order o on o.product_id= p.id
     where o.gmt_create>= '${bizdate}'
       and o.gmt_create< '${today}'
     group by product_type;
    
     /*回流AnalyticDB for MySQL中的数据到MySQL中*/
    insert into demo_id.t_order_report_daily
    select dt,
           product_type,
           order_cnt,
           order_amt
    from company.t_order_report_daily
    where dt= '${bizdate}';
    说明

    示例SQL中的bizdate为系统自定义的业务时间变量,默认取值为任务运行时间减一天。

发布跨库Spark任务

  1. 在任务流详情页面,单击画布左上角的试运行

    单击执行日志页签,查看执行结果。

    • 如果执行日志的最后一行出现status SUCCEEDED,表明任务流试运行成功。

    • 如果执行日志的最后一行出现status FAILED,表明任务流试运行失败。

      说明

      如果试运行失败,在执行日志中查看执行失败的节点和原因,修改节点的配置后重新尝试。

    image

    执行成功后,您还可以在MySQL数据库中查看同步到统计表t_order_report_daily中的统计数据。

  2. 配置周期调度。

    1. 单击画布空白区域。

    2. 单击页面下方的任务流信息页签。

    3. 调度配置区域,打开开启调度开关,配置调度。具体配置,请参见调度周期配置表

  3. 发布任务流。任务流发布后,此任务流会根据设置的调度周期自动执行任务。

    1. 单击画布左上角的发布

    2. 发布对话框中输入备注信息,并单击发布