某管理者希望能在购物平台上按商品类别查看每天的订单数量和金额。本文通过在任务编排中创建跨库Spark任务,实现了定期将在线库中的订单表和商品表同步到数据仓库中进行数据分析,并将分析结果回流在线库中供管理者查询。
前提条件
- 准备一个MySQL数据库作为在线库,用于存放订单表和商品表,且您拥有该数据库的变更权限。如需申请权限,请参见DMS访问控制权限概述。
- 准备一个AnalyticDB MySQL数据仓库,用于数据加工,且您拥有该数据仓库的变更权限。如需申请权限,请参见DMS访问控制权限概述。
背景信息
购物平台会产生大量的数据,对这些数据进行分析时,如果直接在在线库进行分析,轻则会使在线库响应变慢,重则会导致在线库无法响应业务处理。因此,一般在对在线业务进行分析时,需要把在线业务的数据同步到专用于数据加工的数据仓库再进行数据处理。在数据仓库中将数据加工后,还需要把加工结果同步回在线库中,以便于在线应用提供相关的数据分析和统计服务。
准备工作
在MySQL数据库中创建订单表、商品表和统计表;在AnalyticDB MySQL数据仓库中创建接收数据的订单表、商品表和存放分析结果的统计表。
- 登录数据管理DMS 5.0。
- 在顶部菜单栏中,选择。
- 在请先选择数据库弹框中,搜索并选择MySQL数据库,单击确认。
- 在MySQL数据库中新建订单表t_order、商品表t_product和用于同步数据分析结果的统计表t_order_report_daily。
- 创建表名为t_order的订单表。将下列建表SQL语句粘贴到SQL书写区域,单击执行。
创建订单表t_order的SQL语句:
CREATE TABLE `t_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`product_id` bigint(20) NOT NULL COMMENT '产品id',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`customer_id` bigint(20) NOT NULL COMMENT '客户id',
`price` decimal(14,2) NOT NULL COMMENT '价格',
`status` varchar(64) NOT NULL COMMENT '订单状态',
`province` varchar(256) DEFAULT NULL COMMENT '交易省份',
PRIMARY KEY (`id`),
KEY `idx_product_id` (`product_id`),
KEY `idx_customer_id` (`customer_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='订单表'
;
订单表t_order的表结构如下图所示:

- 创建表名为t_product的商品表。将下列建表SQL语句粘贴到SQL书写区域,单击执行。
创建商品表t_product的SQL语句:
CREATE TABLE `t_product` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(128) NOT NULL COMMENT '产品名',
`type` varchar(64) NOT NULL COMMENT '产品类别',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='产品表'
;
商品表t_product的表结构如下图所示:

- 创建表名为t_order_report_daily的统计表。将下列建表SQL语句粘贴到SQL书写区域,单击执行。
创建统计表t_order_report_daily的SQL语句:
CREATE TABLE `t_order_report_daily` (
`dt` varchar(64) NOT NULL COMMENT '统计日期',
`product_type` varchar(64) NOT NULL COMMENT '产品类型',
`cnt` bigint(64) NOT NULL COMMENT '笔数',
`amt` decimal(38,2) NOT NULL COMMENT '金额',
PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='统计表'
;
统计表t_order_report_daily的表结构如下图所示:

- 在顶部菜单栏中,选择。
- 在请先选择数据库弹框中,搜索并选择AnalyticDB MySQL数据仓库,单击确认。
- 在AnalyticDB MySQL数据仓库中新建用于同步数据的订单表t_order、商品表t_product和用于存放数据分析结果的统计表t_order_report_daily。
说明 订单表t_order、商品表t_product和统计表t_order_report_daily的建表SQL语句,请参见
步骤4。
- 订单表t_order和商品表t_product中写入测试数据。使用测试数据构建功能生成数据,具体请参见测试数据构建。
- 构建订单表t_order的数据,数据量为2000万行。
说明 自由操作模式的实例,每个工单最大生成行数为100万行。
- 构建商品表t_product的数据,数据量为1万行。
新增跨库Spark任务
- 登录数据管理DMS 5.0。
- 在顶部菜单栏中,选择。
- 新增任务流。
- 单击新增任务流。
- 在新建任务流对话框中,输入任务流名称和描述,单击确认。
- 在画布左侧任务类型列表中,拖拽跨库Spark SQL节点到画布空白区域。
配置跨库Spark任务
- 在任务流详情页面,选中跨库Spark SQL节点。
- 配置当前日期变量${today}。关于变量的详细介绍,请参见变量概述。
- 单击节点信息页签。
- 单击节点变量页签。
- 输入节点变量的参数,如下图所示。
- 添加商品订单所在的MySQL数据库。
- 在数据库引用区域,单击添加数据库引用。
- 选择数据库类型,搜索并选择数据库,输入数据库在Spark SQL语句中的引用别名为demo_id。关于配置项的详细介绍,请参见数据库配置表。
- 单击保存。
- 添加用于数据加工的AnalyticDB MySQL数据仓库。
- 单击demo_id数据库右侧的
,添加一个新的数据库引用。
- 选择数据库类型,搜索并选择数据库,输入数据库在Spark SQL语句中的引用别名为company。
- 单击保存。
- 在SQL区域,编写Spark SQL语句,并单击保存。
/*同步数据*/
/*全量写商品表*/
insert overwrite company.t_product
select id,
name,
type,
gmt_create,
gmt_modified
from demo_id.t_product ;
/*增量写订单表*/
insert into company.t_order
select id,
product_id,
gmt_create,
gmt_modified,
customer_id,
price,
status,
province
from demo_id.t_order
where gmt_create>= '${bizdate}'
and gmt_create< '${today}' ;
/*加工数据,按商品类别分组统计*/
insert into company.t_order_report_daily
select '${bizdate}',
p.type as product_type,
count(*) order_cnt,
sum(price) order_amt
from company.t_product p join company.t_order o on o.product_id= p.id
where o.gmt_create>= '${bizdate}'
and o.gmt_create< '${today}'
group by product_type;
/*回流AnalyticDB MySQL中的数据到MySQL中*/
insert into demo_id.t_order_report_daily
select dt,
product_type,
order_cnt,
order_amt
from company.t_order_report_daily
where dt= '${bizdate}';
发布跨库Spark任务
- 在任务流详情页面,单击画布左上角的试运行。
单击
执行日志页签,查看执行结果。
- 如果执行日志的最后一行出现
status SUCCEEDED
,表明任务流试运行成功。
- 如果执行日志的最后一行出现
status FAILED
,表明任务流试运行失败。
说明 如果试运行失败,在执行日志中查看执行失败的节点和原因,修改节点的配置后重新尝试。

执行成功后,您还可以在MySQL数据库中查看同步到统计表t_order_report_daily中的统计数据。
- 配置周期调度。
- 单击画布空白区域。
- 单击任务流信息页签。
- 在调度配置区域,打开开启调度开关,配置调度。具体配置,请参见调度周期配置表。
- 发布任务流。任务流发布后,此任务流会根据设置的调度周期自动执行任务。
- 单击画布左上角的发布。
- 在发布对话框中输入备注信息,并单击确认,发布任务流。