本文通过案例为您介绍如何使用实时计算完成订单与销量的统计。

背景信息

以下案例是实时计算的合作伙伴袋鼠云通过阿里云实时计算来完成电商订单管理的案例。

业务架构图

业务流程:
  1. 使用数据传输服务DTS把您的数据同步到大数据总线(DataHub)。具体步骤请参见MySQL到DataHub数据实时同步
  2. 阿里云实时计算订阅大数据总线(DataHub)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库。
  4. 通过阿里云的DataV或者是其他的大屏完成数据展示。

准备工作

将RDS MySQL产生的增量数据实时同步到DataHub中的Topic。由RDS经过DTS数据同步到大数据总线(DataHub)Schema表的信息。

具体步骤请参见MySQL到DataHub数据实时同步

表 1. orders_real源表
字段名 数据类型 详情
dts_ordercodeofsys VARCHAR 订单编号
dts_paytime BIGINT 订单付款时间
dts_deliveredtime VARCHAR 订单发货时间
dts_storecode VARCHAR 店铺编号
dts_warehousecode VARCHAR 仓库code
dts_cancelled BIGINT 是否取消
dts_delivered BIGINT 是否发货
dts_receivercity VARCHAR 收货人城市
dts_receiverprovince VARCHAR 收货人省份
dts_record_id VARCHAR 记录ID
dts_operation_flag VARCHAR 操作Flag
dts_instance_id VARCHAR 数据库instanceId
dts_db_name VARCHAR 数据库名
dts_table_name VARCHAR 数据表
dts_utc_timestamp VARCHAR 更新时间
dts_before_flag VARCHAR 变更前标识
dts_after_flag VARCHAR 变更后标识
表 2. orderdetail_real源表
字段名 数据类型 详情
dts_ordercodeofsys VARCHAR 订单编号
dts_skuname VARCHAR 商品名字
dts_skucode VARCHAR 商品编号
dts_quantity BIGINT 数量
dts_dividedamount DOUBLE 发货金额
dts_salechanneldividedamount DOUBLE 渠道销售金额
dts_initialcost DOUBLE 成本
dts_record_id VARCHAR 记录ID
dts_operation_flag VARCHAR 操作Flag
dts_instance_id VARCHAR 数据库instanceId
dts_db_name VARCHAR 数据库名字
dts_table_name VARCHAR 表名
dts_utc_timestamp VARCHAR 更新时间
dts_before_flag VARCHAR 变更前标识
dts_after_flag VARCHAR 变更后标识

编写业务逻辑

--数据的订单源表。
create table orders_real(
  dts_ordercodeofsys varchar,
  dts_paytime bigint,
  dts_deliveredtime varchar,
  dts_storecode varchar,
  dts_warehousecode varchar,
  dts_cancelled bigint,
  dts_delivered bigint,
  dts_receivercity varchar,
  dts_receiverprovince varchar,
  dts_record_id varchar,
  dts_operation_flag varchar,
  dts_instance_id varchar,
  dts_db_name varchar,
  dts_table_name varchar,
  dts_utc_timestamp varchar,
  dts_before_flag varchar,
  dts_after_flag varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-****.com',
  project='your',
  topic='表名',
  accessId='您的ID',
  accessKey='您的KEY'
);

create table orderdetail_real(
  dts_ordercodeofsys varchar,
  dts_skuname varchar,
  dts_skucode varchar,
  dts_quantity bigint,
  dts_dividedamount double,
  dts_salechanneldividedamount double,
  dts_initialcost double,
  dts_record_id varchar,
  dts_operation_flag varchar,
  dts_instance_id varchar,
  dts_db_name varchar,
  dts_table_name varchar,
  dts_utc_timestamp varchar,
  dts_before_flag varchar,
  dts_after_flag varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-****.com',
  project='yourPorjectName',
  topic='yourTableName',
  accessId='yourAccessId',
  accessKey='yourAccessSecret'
);


create table ads_all_count_amount(
  bill_date varchar,--下单时间。
  bill_count bigint,--总的订单总数。
  qty bigint,--总的销售量。
  primary key (bill_date)
) with (
  type='rds',
  url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
  tableName='yourDatabaseTableName',
  userName='yourDatabaseAccount',
  password='yourDatabasePassword'
);

--订单源表,最新交易时间的商品编号。
CREATE VIEW new_paytime AS
  SELECT
  dts_ordercodeofsys,
  MAX(dts_paytime) AS dts_paytime
FROM orders_real
  GROUP BY dts_ordercodeofsys ;

--订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息。
CREATE VIEW new_orderdetail AS
SELECT
  dts_ordercodeofsys,
  dts_skuname,
  dts_skucode,
  CASE WHEN dts_operation_flag='U'
  AND dts_before_flag='Y'
  AND dts_after_flag='N' THEN -1*dts_quantity
  WHEN dts_operation_flag='U'
  AND dts_before_flag='N'
  AND dts_after_flag='Y' THEN dts_quantity
  WHEN dts_operation_flag='D' THEN -1*dts_quantity
  ELSE dts_quantity
  END AS dts_quantity
FROM
  orderdetail_real;

--订单总单数,总销售量。
INSERT INTO ads_all_count_amount
  SELECT
  FROM_UNIXTIME(cast(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
  COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
  SUM(b.dts_quantity) AS qty
from
  new_paytime a
  join
  new_orderdetail b
  ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
  GROUP BY
  FROM_UNIXTIME(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');    

难点解析

为了方便您理解结构化代码和代码维护,推荐使用View(数据视图概念)将业务逻辑拆分为三个模块。
  • 模块一 :根据订单编号进行分组
    同一个编号订单会有多次业务操作(例如下单、付款、发货),并在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。
    CREATE VIEW new_paytime AS
    SELECT
      dts_ordercodeofsys,
      MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys;     
  • 模块二 :生成有效订单的信息
    --订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息。
    CREATE VIEW new_orderdetail AS
    SELECT
        dts_ordercodeofsys,
        dts_skuname,
        dts_skucode,
    CASE WHEN dts_operation_flag='U'
            AND dts_before_flag='Y'
            AND dts_after_flag='N' THEN -1*dts_quantity
        WHEN dts_operation_flag='U'
            AND dts_before_flag='N'
            AND dts_after_flag='Y' THEN dts_quantity
        WHEN dts_operation_flag='D' THEN -1*dts_quantity
        ELSE dts_quantity
        END AS dts_quantity
    FROM orderdetail_real;
    数据库日志会获取所有的数据记录的变更,而每个订单是有状态的,如下所示。
    字段名 数据类型 详情
    dts_record_id VARCHAR 记录ID。增量日志的唯一标识,唯一递增。如果变更类型为Update,那么增量更新会被拆分成2条,一条Insert,一条Delete。这两条记录具有相同的record_id
    dts_operation_flag VARCHAR

    标示这条增量日志的操作类型,取值为:

    • I:Insert
    • D:Delete
    • U:Update
    dts_instance_id VARCHAR 数据库instanceId。这条增量日志所对应的数据库的Server ID。
    dts_db_name VARCHAR 这条增量更新日志更新的表所在的数据库库名。
    dts_table_name VARCHAR 这条增量更新日志更新的表名。
    dts_utc_timestamp VARCHAR 这条增量日志的操作时间戳,为这个更新操作记录Binlog的时间戳。时间戳为UTC时间。
    dts_before_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新前的值。取值包括:YN。当后面的Column为更新前的值时,dts_before_flag=Y,当后面的Column值为更新后的值时,dts_before_flag=N
    dts_after_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新后的值。取值包括:YN。当后面的Column为更新前的值时,dts_after_flag=N,当后面的Column值为更新后的值时,dts_after_flag=Y
    对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:
    1. 操作类型为Insert
      所有Column值为新插入的记录值,即为更新后的值,所以dts_before_flag=Ndts_after_flag=Y
    2. 操作类型为Update
      Update操作被拆为2条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp相同。第一条日志记录更新前的值,所以dts_before_flag=Ydts_after_flag=N。第二条日志记录了更新后的值,所以dts_before_flag=Ndts_after_flag=Y
    3. 操作类型为Delete
      所有Column值为被删除的记录值,即为更新前的值。所以dts_before_flag=Ydts_after_flag=N
  • 模块三:统计总订单数和销售额
    SELECT
        from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
        COUNT(DISTINCTa.dts_ordercodeofsys) AS bill_count,
        SUM(b.dts_quantity) AS qty
    from
        new_paytime as a
    join
        new_orderdetail as b
    ON
        a.dts_ordercodeofsys=b.dts_ordercodeofsys
    GROUP BY
    from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');                     

DEMO示例以及源代码

根据上文介绍的订单与销量统计解决方案,为您创建了一个包含完整链路的DEMO示例,如下所示。
  • DataHub作为源表。
  • RDS作为结果表。
DEMO代码完整,您可参考示例代码,注册上下游数据,制定自己的订单与销量统计解决方案。单击DEMO代码进行下载。

常见问题

  • Q:模块二中,如何判断有效交易订单?
    A:首先是要满足dts_operation_flag=U或者dts_operation_flag=I,然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态。所以有效交易订单如下。
    dts_operation_flag='U'
          AND dts_before_flag='N'
          AND dts_after_flag='Y' THEN dts_quantity
  • Q:模块二中,为什么THEN -1*dts_quantity

    A:订单的取消或者是交易没有成功,在总的销量里也会记录。为了保证总销量的正确性,所以把没有成交的订单数量设为负数,在计算总的销量会减去这个数量。

  • Q:模块三中,为什么订单源表和订单详情需要进行JOIN操作?

    A:new_paytime查出的是最新交易的时间的所有的订单编号。new_orderdetail查询的是所有有效订单的订单编码、商品名称、商品编号、数量的信息。两张表JOIN是为了方便用户来统计订单总数和总的销量。