通过跨库Spark SQL任务将数据同步到OSS

电商管理者希望能够按商品类别统计并查看前一天的订单数量和金额,本文通过在任务编排中创建跨库Spark任务,实现定期将在线库中的订单表和商品表同步到OSS中进行数据分析,管理者可以在OSS中查看分析结果。

前提条件

  • 准备一个MySQL数据库作为在线库,用于存放订单表和商品表,且您拥有该数据库的查询权限。如需申请权限,请参见访问控制权限概述

  • 创建一个OSS Bucket,并录入DMS中,用于存储数据。在DMS中录入OSS实例,请参见录入对象存储OSS

背景信息

购物平台会产生大量的数据,对这些数据进行分析时,如果直接在在线库进行分析,轻则会使在线库响应变慢,重则会导致在线库无法响应业务处理。一般通过把业务数据同步到离线库或存储的方式对在线业务进行分析,如果您不需要将分析结果同步回在线库,可以将在线业务的数据同步到专用于数据存储的OSS中进行数据加工,您可以直接在OSS中查看数据加工结果。

说明
  • 阿里云对象存储OSS是阿里云提供的海量、安全、低成本、高持久的云存储服务。

  • 如果您需要将分析结果同步回在线库,请参见通过任务编排实现跨库数据同步

操作步骤

  1. 准备工作

  2. 新增跨库Spark SQL任务

  3. 配置跨库Spark SQL任务

  4. 运行及发布跨库Spark SQL任务

准备工作

在线库MySQL中创建订单表、商品表。

  1. 登录数据管理DMS 5.0
  2. 单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > SQL窗口 > SQL窗口

    说明

    若您使用的是非极简模式的控制台,在顶部菜单栏中,选择SQL窗口 > SQL窗口

  3. 请先选择数据库弹框中,搜索并选择MySQL数据库,单击确认

  4. 在MySQL数据库中新建订单表t_order、商品表t_product。

    1. 创建表名为t_order的订单表。将下列建表SQL语句粘贴到SQL书写区域,单击执行

      创建订单表t_order的SQL语句:

      CREATE TABLE `t_order` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
        `product_id` bigint(20) NOT NULL COMMENT '产品id',
        `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
        `customer_id` bigint(20) NOT NULL COMMENT '客户id',
        `price` decimal(14,2) NOT NULL COMMENT '价格',
        `status` varchar(64) NOT NULL COMMENT '订单状态',
        `province` varchar(256) DEFAULT NULL COMMENT '交易省份',
        PRIMARY KEY (`id`),
        KEY `idx_product_id` (`product_id`),
        KEY `idx_customer_id` (`customer_id`),
        KEY `idx_status` (`status`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='订单表'
      ;
    2. 创建表名为t_product的商品表。将下列建表SQL语句粘贴到SQL书写区域,单击执行

      创建商品表t_product的SQL语句:

      CREATE TABLE `t_product` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
        `name` varchar(128) NOT NULL COMMENT '产品名',
        `type` varchar(64) NOT NULL COMMENT '产品类别',
        `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='产品表'
      ;
  5. 订单表t_order和商品表t_product中写入测试数据。使用测试数据构建功能生成数据,具体请参见测试数据构建

    • 构建订单表t_order的数据,数据量为2000万行。

      说明

      自由操作模式的实例,每个工单最大生成数据行数为100万行。

    • 构建商品表t_product的数据,数据量为1万行。

新增跨库Spark SQL任务

  1. 登录数据管理DMS 5.0
  2. 单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 集成与开发 > 数据开发 > 任务编排

    说明

    若您使用的是非极简模式的控制台,在顶部菜单栏中,选择集成与开发 > 数据开发 > 任务编排

  3. 新增任务流。

    1. 单击新增任务流

    2. 选择任务流所在业务场景,输入任务流名称描述,再单击确认

  4. 在画布左侧任务类型列表中,拖拽跨库Spark SQL节点到画布空白区域。

配置跨库Spark SQL任务

  1. 在任务流详情页面,双击跨库Spark SQL节点。

  2. 在配置页面,配置当前日期变量${today}。关于变量的详细介绍,请参见变量

    1. 在界面右侧,单击变量设置

    2. 节点变量页签下,输入变量。

      image

  3. 添加用于数据存储和分析的OSS。

    1. OSS引用区域,单击添加OSS引用

    2. 选择目标OSS Bucket。

      说明

      如果OSS未登录,在登录实例对话框中,输入AccessKey ID和AccessKey Secret登录OSS实例。

    3. 指定数据保存在OSS Bucket上的路径。

      说明
      • 如果路径不存在,将自动创建。

      • 路径中支持使用变量,例如/path/${foldername}

    4. 输入OSS在Spark SQL语句中的引用别名为oss。

    5. 单击保存

  4. 添加商品订单所在的MySQL数据库。

    1. 数据库引用区域,单击添加数据库引用

    2. 选择数据库类型,搜索并选择数据库,输入数据库在Spark SQL语句中的引用别名为demo_id。关于配置项的详细介绍,请参见数据库配置表

      说明

      如果数据库未登录,在登录实例对话框中,输入数据库账号和密码登录数据库实例。

    3. 单击保存

  5. 在SQL区域,编写Spark SQL语句,并单击保存

    bizdate为系统变量,无需您手动配置。

    /* 请使用Spark SQL的语法编写SQL,表的引用方式为 alias.table_name */
    
    /*创建oss t_order表引用,添加dt作为分区字段*/
    CREATE TABLE oss.t_order (
      id bigint COMMENT '主键',
      product_id bigint  COMMENT '产品id',
      gmt_create timestamp  COMMENT '创建时间',
      gmt_modified timestamp  COMMENT '修改时间',
      customer_id bigint COMMENT '客户id',
      price decimal(38,8)  COMMENT '价格',
      status string COMMENT '订单状态',
      province string COMMENT '交易省份',
      dt string comment '业务日期分区'
    )  partitioned by (dt) COMMENT '订单表';
    
    insert overwrite oss.t_order partition(dt='${bizdate}')
    select id, product_id, gmt_create, gmt_modified, customer_id, price, status, province 
    from demo_id.t_order o 
    where o.gmt_create>= '${bizdate}' and o.gmt_create< '${today}';
    
    /*创建oss t_product表引用*/
    CREATE TABLE oss.t_product (
      id bigint COMMENT '主键',
      name string COMMENT '产品名',
      type string COMMENT '产品类别',
      gmt_create timestamp  COMMENT '创建时间',
      gmt_modified timestamp  COMMENT '修改时间'
    )   COMMENT '产品表';
    /*全量同步商品表数据*/
    insert overwrite oss.t_product 
    select id, name, type, gmt_create, gmt_modified 
    from demo_id.t_product; 
    
    /*创建oss t_order_report_daily表引用,以dt字段作为分区字段*/
    CREATE TABLE oss.t_order_report_daily(
       dt string  comment '业务日期',
       product_type string  comment '商品类别',
       order_cnt bigint  comment '订单数',
       order_amt decimal(38, 8)  comment '订单金额'
    )  partitioned by (dt) comment '订单统计日表';
    
    /*按分区插入数据*/
    insert overwrite oss.t_order_report_daily partition(dt='${bizdate}')
    select
           p.type as product_type,
           count(*)  order_cnt,
           sum(price)  order_amt
      from oss.t_product p join oss.t_order o on o.product_id= p.id
     where o.gmt_create>= '${bizdate}'
       and o.gmt_create< '${today}'
     group by product_type;
                            

    OSS支持4种文件存储格式:CSV、Parquet、ORC、JSON,默认使用CSV格式。您可以在CREATE TABLE语句中通过USING指定。

    例如,将表的存储格式指定为Parquet:

    CREATE TABLE oss.t_order (
      id bigint COMMENT '主键',
      product_id bigint  COMMENT '产品id',
      gmt_create timestamp  COMMENT '创建时间',
      gmt_modified timestamp  COMMENT '修改时间',
      customer_id bigint COMMENT '客户id',
      price decimal(38,8)  COMMENT '价格',
      status string COMMENT '订单状态',
      province string COMMENT '交易省份',
      dt string comment '业务日期分区'
    )  USING PARQUET partitioned by (dt) COMMENT '订单表';

运行及发布跨库Spark SQL任务

  1. 在任务流详情页面,单击画布左上角的试运行

    单击执行日志页签,查看执行结果。

    • 如果执行日志的最后一行出现status SUCCEEDED,表明任务流试运行成功。

    • 如果执行日志的最后一行出现status FAILED,表明任务流试运行失败。

      说明

      如果试运行失败,在执行日志中查看执行失败的节点和原因,修改节点的配置后重新尝试。

    执行成功后,您可以在DMS首页的左侧实例列表中,右键单击OSS的Bucket名称,单击查询,查看OSS中同步和分析的数据。

  2. 配置周期调度。

    1. 单击画布空白区域。

    2. 单击任务流信息页签。

    3. 调度配置区域,打开开启调度开关,配置调度。具体配置,请参见调度周期配置表

  3. 发布任务流。任务流发布后,此任务流会根据设置的调度周期自动执行任务。

    1. 单击画布左上角的发布

    2. 输入备注信息,再单击发布