本文通过案例为您介绍如何使用实时计算完成订单与销量的统计。
背景信息
以下案例是实时计算的合作伙伴袋鼠云通过阿里云实时计算来完成电商订单管理的案例。
业务架构图

业务流程:
- 使用数据传输服务DTS把您的数据同步到大数据总线(DataHub)。具体步骤请参见MySQL到DataHub数据实时同步。
- 阿里云实时计算订阅大数据总线(DataHub)的数据进行实时计算。
- 将实时数据插入到RDS的云数据库。
- 通过阿里云的DataV或者是其他的大屏完成数据展示。
准备工作
将RDS MySQL产生的增量数据实时同步到DataHub中的Topic。由RDS经过DTS数据同步到大数据总线(DataHub)Schema表的信息。
具体步骤请参见MySQL到DataHub数据实时同步。
字段名 | 数据类型 | 详情 |
---|---|---|
dts_ordercodeofsys | VARCHAR | 订单编号 |
dts_paytime | BIGINT | 订单付款时间 |
dts_deliveredtime | VARCHAR | 订单发货时间 |
dts_storecode | VARCHAR | 店铺编号 |
dts_warehousecode | VARCHAR | 仓库code |
dts_cancelled | BIGINT | 是否取消 |
dts_delivered | BIGINT | 是否发货 |
dts_receivercity | VARCHAR | 收货人城市 |
dts_receiverprovince | VARCHAR | 收货人省份 |
dts_record_id | VARCHAR | 记录ID |
dts_operation_flag | VARCHAR | 操作Flag |
dts_instance_id | VARCHAR | 数据库instanceId |
dts_db_name | VARCHAR | 数据库名 |
dts_table_name | VARCHAR | 数据表 |
dts_utc_timestamp | VARCHAR | 更新时间 |
dts_before_flag | VARCHAR | 变更前标识 |
dts_after_flag | VARCHAR | 变更后标识 |
字段名 | 数据类型 | 详情 |
---|---|---|
dts_ordercodeofsys | VARCHAR | 订单编号 |
dts_skuname | VARCHAR | 商品名字 |
dts_skucode | VARCHAR | 商品编号 |
dts_quantity | BIGINT | 数量 |
dts_dividedamount | DOUBLE | 发货金额 |
dts_salechanneldividedamount | DOUBLE | 渠道销售金额 |
dts_initialcost | DOUBLE | 成本 |
dts_record_id | VARCHAR | 记录ID |
dts_operation_flag | VARCHAR | 操作Flag |
dts_instance_id | VARCHAR | 数据库instanceId |
dts_db_name | VARCHAR | 数据库名字 |
dts_table_name | VARCHAR | 表名 |
dts_utc_timestamp | VARCHAR | 更新时间 |
dts_before_flag | VARCHAR | 变更前标识 |
dts_after_flag | VARCHAR | 变更后标识 |
编写业务逻辑
--数据的订单源表。
create table orders_real(
dts_ordercodeofsys varchar,
dts_paytime bigint,
dts_deliveredtime varchar,
dts_storecode varchar,
dts_warehousecode varchar,
dts_cancelled bigint,
dts_delivered bigint,
dts_receivercity varchar,
dts_receiverprovince varchar,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-****.com',
project='your',
topic='表名',
accessId='您的ID',
accessKey='您的KEY'
);
create table orderdetail_real(
dts_ordercodeofsys varchar,
dts_skuname varchar,
dts_skucode varchar,
dts_quantity bigint,
dts_dividedamount double,
dts_salechanneldividedamount double,
dts_initialcost double,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-****.com',
project='yourPorjectName',
topic='yourTableName',
accessId='yourAccessId',
accessKey='yourAccessSecret'
);
create table ads_all_count_amount(
bill_date varchar,--下单时间。
bill_count bigint,--总的订单总数。
qty bigint,--总的销售量。
primary key (bill_date)
) with (
type='rds',
url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
tableName='yourDatabaseTableName',
userName='yourDatabaseAccount',
password='yourDatabasePassword'
);
--订单源表,最新交易时间的商品编号。
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys ;
--订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息。
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag='U'
AND dts_before_flag='Y'
AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U'
AND dts_before_flag='N'
AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
orderdetail_real;
--订单总单数,总销售量。
INSERT INTO ads_all_count_amount
SELECT
FROM_UNIXTIME(cast(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
new_paytime a
join
new_orderdetail b
ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
FROM_UNIXTIME(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
难点解析
为了方便您理解结构化代码和代码维护,推荐使用View(数据视图概念)将业务逻辑拆分为三个模块。
- 模块一 :根据订单编号进行分组
同一个编号订单会有多次业务操作(例如下单、付款、发货),并在Binlog日志中形成多条同一订单编号的订单流水记录。使用
MAX(dts_paytime)
获取同一编号的最后一次操作数据库最终付款交易时间。CREATE VIEW new_paytime AS SELECT dts_ordercodeofsys, MAX(dts_paytime) AS dts_paytime FROM orders_real GROUP BY dts_ordercodeofsys;
- 模块二 :生成有效订单的信息
数据库日志会获取所有的数据记录的变更,而每个订单是有状态的,如下表所示。--订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息。 CREATE VIEW new_orderdetail AS SELECT dts_ordercodeofsys, dts_skuname, dts_skucode, CASE WHEN dts_operation_flag='U' AND dts_before_flag='Y' AND dts_after_flag='N' THEN -1*dts_quantity WHEN dts_operation_flag='U' AND dts_before_flag='N' AND dts_after_flag='Y' THEN dts_quantity WHEN dts_operation_flag='D' THEN -1*dts_quantity ELSE dts_quantity END AS dts_quantity FROM orderdetail_real;
字段名 数据类型 详情 dts_record_id VARCHAR 记录ID。增量日志的唯一标识,唯一递增。如果变更类型为Update,那么增量更新会被拆分成2条,一条Insert,一条Delete。这两条记录具有相同的 record_id
。dts_operation_flag VARCHAR 标示这条增量日志的操作类型,取值为:
- I:Insert
- D:Delete
- U:Update
dts_instance_id VARCHAR 数据库 instanceId
。这条增量日志所对应的数据库的Server ID。dts_db_name VARCHAR 这条增量更新日志更新的表所在的数据库库名。 dts_table_name VARCHAR 这条增量更新日志更新的表名。 dts_utc_timestamp VARCHAR 这条增量日志的操作时间戳,为这个更新操作记录Binlog的时间戳。时间戳为UTC时间。 dts_before_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新前的值。取值包括: Y
和N
。当后面的Column为更新前的值时,dts_before_flag=Y
,当后面的Column值为更新后的值时,dts_before_flag=N
。dts_after_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新后的值。取值包括: Y
和N
。当后面的Column为更新前的值时,dts_after_flag=N
,当后面的Column值为更新后的值时,dts_after_flag=Y
。对于不同的操作类型,增量日志中的dts_before_flag
和dts_after_flag
定义如下:- 操作类型为Insert
所有Column值为新插入的记录值,即为更新后的值,所以
dts_before_flag=N
,dts_after_flag=Y
。 - 操作类型为Update
Update操作被拆为2条增量日志。这两条增量日志的
dts_record_id
,dts_operation_flag
及dts_utc_timestamp
相同。第一条日志记录更新前的值,所以dts_before_flag=Y
,dts_after_flag=N
。第二条日志记录了更新后的值,所以dts_before_flag=N
,dts_after_flag=Y
。 - 操作类型为Delete
所有Column值为被删除的记录值,即为更新前的值。所以
dts_before_flag=Y
,dts_after_flag=N
。
- 模块三:统计总订单数和销售额
SELECT from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date, COUNT(DISTINCTa.dts_ordercodeofsys) AS bill_count, SUM(b.dts_quantity) AS qty from new_paytime as a join new_orderdetail as b ON a.dts_ordercodeofsys=b.dts_ordercodeofsys GROUP BY from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
DEMO示例以及源代码
根据上文介绍的订单与销量统计解决方案,为您创建了一个包含完整链路的DEMO示例,如下所示:
- DataHub作为源表。
- RDS作为结果表。
常见问题
- Q:模块二中,如何判断有效交易订单?
A:首先是要满足
dts_operation_flag=U
或者dts_operation_flag=I
,然后dts_before_flag
代表的是变更前订单状态,dts_after_flag
是变更后订单状态。所以有效交易订单如下。dts_operation_flag='U' AND dts_before_flag='N' AND dts_after_flag='Y' THEN dts_quantity
- Q:模块二中,为什么
THEN -1*dts_quantity
?A:订单的取消或者是交易没有成功,在总的销量里也会记录。为了保证总销量的正确性,所以把没有成交的订单数量设为负数,在计算总的销量会减去这个数量。
- Q:模块三中,为什么订单源表和订单详情需要进行JOIN操作?
A:
new_paytime
查出的是最新交易的时间的所有的订单编号。new_orderdetail
查询的是所有有效订单的订单编码、商品名称、商品编号、数量的信息。两张表JOIN是为了方便用户来统计订单总数和总的销量。