电商管理者希望能够按商品类别统计并查看前一天的订单数量和金额,本文通过在任务编排中创建跨库Spark任务,实现定期将在线库中的订单表和商品表同步到OSS中进行数据分析,管理者可以在OSS中查看分析结果。
前提条件
- 准备一个MySQL数据库作为在线库,用于存放订单表和商品表,且您拥有该数据库的查询权限。如需申请权限,请参见DMS访问控制权限概述。
- 创建一个OSS Bucket,并录入DMS中,用于存储数据。在DMS中录入OSS实例,请参见录入对象存储OSS。
背景信息
购物平台会产生大量的数据,对这些数据进行分析时,如果直接在在线库进行分析,轻则会使在线库响应变慢,重则会导致在线库无法响应业务处理。一般通过把业务数据同步到离线库或存储的方式对在线业务进行分析,如果您不需要将分析结果同步回在线库,可以将在线业务的数据同步到专用于数据存储的OSS中进行数据加工,您可以直接在OSS中查看数据加工结果。说明
- 阿里云对象存储OSS是阿里云提供的海量、安全、低成本、高持久的云存储服务。
- 如果您需要将分析结果同步回在线库,请参见通过任务编排实现跨库数据同步。
操作步骤
准备工作
在线库MySQL中创建订单表、商品表。
- 登录数据管理DMS 5.0。
- 在顶部菜单栏中,选择 。
- 在请先选择数据库弹框中,搜索并选择MySQL数据库,单击确认。
- 在MySQL数据库中新建订单表t_order、商品表t_product。
- 订单表t_order和商品表t_product中写入测试数据。使用测试数据构建功能生成数据,具体请参见测试数据构建。
- 构建订单表t_order的数据,数据量为2000万行。说明 自由操作模式的实例,每个工单最大生成数据行数为100万行。
- 构建商品表t_product的数据,数据量为1万行。
- 构建订单表t_order的数据,数据量为2000万行。
新增跨库Spark SQL任务
- 登录数据管理DMS 5.0。
- 在顶部菜单栏中,选择 。
- 新增任务流。
- 单击新增任务流。
- 在新建任务流对话框中,输入任务流名称和描述,单击确认。
- 在画布左侧任务类型列表中,拖拽配置跨库Spark SQL节点到画布空白区域。
配置跨库Spark SQL任务
- 在任务流详情页面,双击配置跨库Spark SQL节点。
- 在配置页面,配置当前日期变量${today}。关于变量的详细介绍,请参见变量概述。
- 添加用于数据存储和分析的OSS。
- 添加商品订单所在的MySQL数据库。
- 在SQL区域,编写Spark SQL语句,并单击保存。
/* 请使用Spark SQL的语法编写SQL,表的引用方式为 alias.table_name */ /*创建oss t_order表引用,添加dt作为分区字段*/ CREATE TABLE oss.t_order ( id bigint COMMENT '主键', product_id bigint COMMENT '产品id', gmt_create timestamp COMMENT '创建时间', gmt_modified timestamp COMMENT '修改时间', customer_id bigint COMMENT '客户id', price decimal(38,8) COMMENT '价格', status string COMMENT '订单状态', province string COMMENT '交易省份', dt string comment '业务日期分区' ) partitioned by (dt) COMMENT '订单表'; insert overwrite oss.t_order partition(dt='${bizdate}') select id, product_id, gmt_create, gmt_modified, customer_id, price, status, province from demo_id.t_order o where o.gmt_create>= '${bizdate}' and o.gmt_create< '${today}'; /*创建oss t_product表引用*/ CREATE TABLE oss.t_product ( id bigint COMMENT '主键', name string COMMENT '产品名', type string COMMENT '产品类别', gmt_create timestamp COMMENT '创建时间', gmt_modified timestamp COMMENT '修改时间' ) COMMENT '产品表'; /*全量同步商品表数据*/ insert overwrite oss.t_product select id, name, type, gmt_create, gmt_modified from demo_id.t_product; /*创建oss t_order_report_daily表引用,以dt字段作为分区字段*/ CREATE TABLE oss.t_order_report_daily( dt string comment '业务日期', product_type string comment '商品类别', order_cnt bigint comment '订单数', order_amt decimal(38, 8) comment '订单金额' ) partitioned by (dt) comment '订单统计日表'; /*按分区插入数据*/ insert overwrite oss.t_order_report_daily partition(dt='${bizdate}') select p.type as product_type, count(*) order_cnt, sum(price) order_amt from oss.t_product p join oss.t_order o on o.product_id= p.id where o.gmt_create>= '${bizdate}' and o.gmt_create< '${today}' group by product_type;
OSS支持4种文件存储格式:CSV、Parquet、ORC、JSON,默认使用CSV格式。您可以在CREATE TABLE语句中通过
USING
指定。例如,将表的存储格式指定为Parquet:
CREATE TABLE oss.t_order ( id bigint COMMENT '主键', product_id bigint COMMENT '产品id', gmt_create timestamp COMMENT '创建时间', gmt_modified timestamp COMMENT '修改时间', customer_id bigint COMMENT '客户id', price decimal(38,8) COMMENT '价格', status string COMMENT '订单状态', province string COMMENT '交易省份', dt string comment '业务日期分区' ) USING PARQUET partitioned by (dt) COMMENT '订单表';
发布跨库Spark SQL任务
- 在任务流详情页面,单击画布左上角的试运行。单击执行日志页签,查看执行结果。
- 如果执行日志的最后一行出现
status SUCCEEDED
,表明任务流试运行成功。 - 如果执行日志的最后一行出现
status FAILED
,表明任务流试运行失败。说明 如果试运行失败,在执行日志中查看执行失败的节点和原因,修改节点的配置后重新尝试。
- 如果执行日志的最后一行出现
- 配置周期调度。
- 单击画布空白区域。
- 单击任务流信息页签。
- 在调度配置区域,打开开启调度开关,配置调度。具体配置,请参见调度周期配置表。
- 发布任务流。任务流发布后,此任务流会根据设置的调度周期自动执行任务。
- 单击画布左上角的发布。
- 在发布对话框中输入备注信息,并单击确认,发布任务流。