进阶案例:商品订单畅销类目分析

更新时间:2025-04-29 05:52:40

DataWorks基于MaxCompute、Hologres、EMR、AnalyticDB、CDP等大数据引擎,为数据仓库、数据湖、湖仓一体等解决方案提供统一的全链路大数据开发治理平台。本文以DataWorks的部分核心功能为例,指导您使用DataWorks接入数据并进行业务处理、周期调度以及数据可视化。

入门简介

本教程以电商场景为例,演示如何构建从原始数据接入→数据分析计算→可视化输出的完整数据管道,通过标准化的开发流程,快速搭建可复用的数据生产链路,保证调度可靠性与运维可观测性。使业务人员无需深入技术细节即可完成数据价值转换,降低企业大数据应用门槛。

通过本教程,您可以快速完成以下操作。

  1. 数据同步:通过DataWorks的数据集成模块,创建离线同步任务,将业务数据同步至大数据计算平台(如MaxCompute)。

  2. 数据清洗:在DataWorks的数据开发模块中,对业务数据进行处理、分析和挖掘。

  3. 数据展示:在DataWorks的数据分析模块中,将分析结果转化为图表,便于业务人员理解。

  4. 周期性调度:为数据同步和数据清洗流程配置周期性调度,使其定时执行。

image

本教程从公开数据源同步原始商品和订单数据至MaxCompute中,通过如下数据分析流程,产出每日最畅销商品类目排名:

image

前提条件

为确保本教程可以顺利进行,推荐使用阿里云主账号或具备AliyunDataWorksFullAccess权限的RAM用户。具体操作,请参见准备阿里云账号(主账号)准备RAM用户(子账号)

说明

DataWorks提供了完善的权限管控机制,支持在产品级与模块级对权限进行管控,如果您需要更精细的权限控制,请参见DataWorks权限体系功能概述

准备工作

开通DataWorks

本教程以华东2(上海)地域为例,介绍DataWorks快速入门,您需要登录DataWorks管理控制台,切换至华东2(上海)地域,查看该地域是否开通DataWorks。

说明

本教程以华东2(上海)为例,在实际使用中,请根据实际业务数据所在位置确定开通地域:

  • 如果您的业务数据位于阿里云的其他云服务,请选择与其相同的地域。

  • 如果您的业务在本地,需要通过公网访问,请选择与您实际地理位置较近的地域,以降低访问延迟。

全新用户
开通过但已到期
已开通

如果您为新用户,首次使用DataWorks,将显示如下内容,表示当前地域尚未开通DataWorks,需要单击0元组合购买

image

  1. 配置组合购买页相关参数。

    参数

    说明

    示例

    地域

    选择需要开通DataWorks的地域。

    华东2(上海)

    DataWorks版本

    选择需要购买的DataWorks版本。

    说明

    本教程以基础版为例,所有版本均可体验本教程所涉及的功能,您可以参考DataWorks各版本支持的功能详情,根据实际业务需要,选择合适的DataWorks版本。

    基础版

    DataWorks资源组

    通过DataWorks进行数据集成、数据开发、数据调度等任务时,需要消耗计算资源,您需要配套购买资源组,以确保后续任务的顺利运行。

    • 资源组名称:自定义

    • 专有网络(VPC)交换机(V-Switch)

      • 没有VPC和交换机:如果不配置,DataWorks将自动创建。您也可以单击参数说明中对应的控制台链接手动创建。

      • 已有VPC和交换机:选择已有的VPC和交换机。

      说明

      VPC和交换机的更多信息,请参见什么是专有网络

    • 服务关联角色:根据页面提示,单击创建服务关联角色。

  2. 单击确认订单并支付,完成后续支付。

如果您在华东2(上海)地域曾经开通过DataWorks,但DataWorks版本已到期,则会出现如下提示,需要单击购买版本

image

  1. 配置购买页相关参数。

    参数

    说明

    示例

    版本

    选择需要购买的DataWorks版本。

    说明

    本教程以基础版为例,所有版本均可体验本教程所涉及的功能,您可以参考DataWorks各版本支持的功能详情,根据实际业务需要,选择合适的DataWorks版本。

    基础版

    地域和可用区

    选择需要开通DataWorks的地域。

    华东2(上海)

  2. 单击立即购买,完成后续支付。

重要

您在购买DataWorks版本后,如未找到相关DataWorks版本,可进行以下操作:

  • 等待几分钟刷新页面,系统更新可能会有延迟。

  • 查看所在地域是否与购买DataWorks版本地域一致,防止因地域选择问题,未找到相关DataWorks版本。

如果您在华东2(上海)地域已开通DataWorks,将会进入DataWorks概览页,可直接进行下一步。

创建工作空间

  1. 前往DataWorks工作空间列表页,切换至华东2(上海)地域,单击创建工作空间

  2. 创建工作空间页面,自定义工作空间名称,开启参加数据开发(Data Studio)公测,然后单击创建工作空间

    说明

    20250218日后,主账号在华东2(上海)地域首次开通DataWorks并创建工作空间时,默认启用新版数据开发,界面将不展示参加数据开发(Data Studio)公测参数。

创建资源组并绑定工作空间

  1. 前往DataWorks资源组列表页,切换至华东2(上海)地域,单击创建资源组

  2. 在资源组购买页面,配置如下参数。

    参数

    说明

    资源组名称

    自定义。

    专有网络(VPC)交换机(V-Switch)

    选择已有的VPC和交换机,如当前地域没有,请单击参数说明中对应的控制台链接前往创建。

    服务关联角色

    根据页面提示,创建DataWorks服务关联角色

  3. 单击立即购买,完成后续支付。

  4. 前往DataWorks资源组列表页,切换至华东2(上海)地域,找到已创建的资源组,单击操作列的绑定工作空间

  5. 绑定工作空间页面,找到已创建的DataWorks工作空间,单击其操作列的绑定

为资源组开通公网

本教程使用的电商平台公开测试业务数据需要通过公网获取,而上一步创建的通用型资源组默认不具备公网访问能力,需要为资源组绑定的VPC配置公网NAT网关添加EIP使其与公开数据的网络打通,从而获取数据。

  1. 登录专有网络-公网NAT网关控制台,在顶部菜单栏切换至华东2(上海)地域,单击创建公网NAT网关。配置相关参数。

    说明

    表中未说明的参数保持默认值即可。

    参数

    取值

    所属地域

    华东2(上海)。

    所属专有网络

    选择资源组绑定的VPC和交换机。

    您可以前往DataWorks资源组列表页,切换至华东2(上海)地域,找到已创建的资源组,然后单击操作列的网络设置,在数据调度 & 数据集成区域查看绑定专有网络交换机。VPC和交换机的更多信息,请参见什么是专有网络

    关联交换机

    访问模式

    VPC全通模式(SNAT)。

    弹性公网IP实例

    新购弹性公网IP。

    关联角色创建

    首次创建NAT网关时,需要创建服务关联角色,请单击创建关联角色

  2. 单击立即购买,完成后续支付,所有资源状态均为成功后,即表示资源组绑定的VPC已具备公网访问能力。

    image

创建并绑定MaxCompute计算资源

本教程需要创建MaxCompute项目并将其绑定为DataWorks计算资源,用于后续接收数据并进行大数据分析。

  1. 前往DataWorks工作空间列表页,切换至华东2(上海)地域,找到已创建的工作空间,单击工作空间名称,进入空间详情页。

  2. 在左侧导航栏单击计算资源,进入计算资源页面,单击绑定计算资源,选择MaxCompute类型。配置如下关键参数,创建MaxCompute项目并将其绑定为DataWorks的计算资源。

    说明

    表中未说明的参数保持默认值即可。

    参数

    描述

    MaxCompute项目

    在下拉选择框中单击新建,填写如下参数。

    • 项目名称:自定义,全网唯一。

    • 计算资源付费类型:选择按量付费

      说明

      如果按量付费不可选,请单击按量付费后的去开通,完成MaxCompute服务的开通。

    • 默认Quota:下拉选择默认已存在的Quota。

    默认访问身份

    选择阿里云主账号

    计算资源实例名

    在后续任务运行时,通过计算资源实例名称来选择任务运行的计算资源,方便识别,例如本教程命名为MaxCompute_Source

  3. 单击确认

操作步骤

本文以如下场景为例,指导您快速体验DataWorks的相关功能:

假设某一电商平台将商品信息、订单信息存储在MySQL数据库中,需要定期对订单数据进行分析,通过可视化的方式查看每日最畅销商品类目排名表。

步骤一:数据同步

创建数据源

DataWorks通过创建数据源的方式,接入数据来源和数据去向,本步骤需要创建MySQL数据源,用于连接数据来源(存储业务数据的MySQL数据库),为本教程提供原始业务数据。

说明

您无需准备本教程使用的原始业务数据,为方便测试和学习,DataWorks为您提供测试数据集,相关表数据已存储在公网MySQL数据库中,您只需创建MySQL数据源接入即可。

  1. 前往DataWorks管理中心页,切换至华东2(上海)地域,在下拉框中选择已创建的工作空间后,单击进入管理中心

  2. 在左侧导航栏单击数据源,进入数据源列表页,单击新增数据源,选择MySQL类型,配置MySQL数据源相关参数。

    说明
    • 表中未说明的参数保持默认值即可。

    • 首次新增数据源时,需要完成跨服务授权,请根据页面提示,授权服务关联角色AliyunDIDefaultRole

    参数

    描述

    参数

    描述

    数据源名称

    本示例为MySQL_Source

    配置模式

    选择连接串模式

    连接地址

    • 主机地址IP:rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

    • 端口号:3306

    重要

    本教程提供的数据仅作为阿里云大数据开发治理平台 DataWorks数据应用实操使用,所有数据均为测试数据,并且仅支持在数据集成模块读取数据。

    数据库名称

    配置为retail_e_commerce

    用户名

    输入用户名workshop

    密码

    输入密码workshop#2017

  3. 连接配置区域,切换至数据集成页签,找到工作空间已绑定的资源组,单击连通状态列的测试连通性

    说明

    如果MySQL数据源连通性测试失败,请进行以下操作:

    • 完成连通性诊断工具后续操作。

    • 请检查是否为资源组绑定的VPC配置EIP,MySQL数据源需要资源组具备公网访问能力。详情请参见为资源组开通公网

  4. 单击完成创建

搭建同步链路

本步骤需要搭建同步链路,将电商平台商品订单数据同步至MaxCompute的表中,为后续加工数据做准备。

  1. 单击左上方的图标图标,选择全部产品 > 数据开发与运维 > DataStudio(数据开发),进入数据开发页面。

  2. 在页面顶部切换至本教程创建好的工作空间,在左侧导航栏单击image,进入数据开发-项目目录页面。

  3. 项目目录区域,单击image,选择新建工作流,设置工作流名称。本教程设置为dw_quickstart

  4. 在工作流编辑页面,从左侧拖拽虚拟节点离线同步节点至画布中,分别设置节点名称。

    本教程节点名称示例及作用如下:

    节点类型

    节点名称

    节点作用

    节点类型

    节点名称

    节点作用

    image虚拟节点

    workshop

    用于统筹管理整个用户画像分析工作流,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。

    image离线同步节点

    ods_item_info

    用于将存储于MySQL的商品信息源表item_info同步至MaxComputeods_item_info表。

    image离线同步节点

    ods_trade_order

    用于将存储于MySQL的订单信息源表trade_order同步至MaxComputeods_trade_order表。

    手动拖拽连线,将workshop节点设置为两个离线同步节点的上游节点。最终效果如下:

    image
  5. 工作流调度配置。

    在工作流编辑页面右侧单击调度配置,配置相关参数。以下为本教程所需配置的关键参数,未说明参数保持默认即可。

    调度配置参数

    说明

    调度配置参数

    说明

    调度参数

    为整个工作流设置调度参数,工作流中的内部节点可直接使用。

    本教程配置为bizdate=$[yyyymmdd-1],获取前一天的日期。

    说明

    DataWorks提供的调度参数,可实现代码动态入参,您可在SQL代码中通过${变量名}的方式定义代码中的变量,并在调度配置 > 调度参数处,为该变量赋值。调度参数支持的格式,详情请参见调度参数支持的格式

    调度周期

    本教程配置为

    调度时间

    本教程配置调度时间00:30,该工作流会在每日00:30启动。

    周期依赖

    工作流无上游依赖,可不配置。为了方便统一管理,您可以单击使用工作空间根节点,将工作流挂载到工作空间根节点下。

    工作空间根节点命名格式为:工作空间名_root

  6. 在节点工具栏单击保存

配置同步任务

配置初始节点
  1. 在工作流编辑页面中,鼠标悬停至workshop节点上,单击打开节点

  2. workshop节点编辑页面右侧单击调度配置,配置相关参数。以下为本教程所需配置的关键参数,未说明参数保持默认即可。

    调度配置参数

    说明

    调度类型

    本教程配置为空跑调度

    调度资源组

    本教程配置为创建资源组并绑定工作空间时创建的Serverless资源组。

    节点依赖配置

    由于workshop为初始节点,无上游依赖,此时可以单击使用工作空间根节点,由工作空间根节点触发工作流执行。

    工作空间根节点命名为:工作空间名_root

  3. 在节点工具栏单击保存

配置商品信息同步链路(ods_item_info)
  1. 在工作流编辑页面中,鼠标悬停至ods_item_info节点上,单击打开节点

  2. 配置同步链路网络与资源。

    参数

    描述

    数据来源

    • 数据来源:MySQL

    • 数据源名称:MySQL_Source

    我的资源组

    选择创建资源组并绑定工作空间时购买的Serverless资源组。

    数据去向

  3. 单击下一步,配置同步任务。

    1. 配置数据来源与去向

      说明

      表中未说明的参数保持默认值即可。

      配置区域

      参数

      配置说明

      数据来源

      item_info

      数据去向

      单击一键生成目标表结构快速创建MaxCompute表。将如下建表语句粘贴至建表语句区域,单击新建表。该表用于接收数据来源端的商品信息。

      建表SQL

      CREATE TABLE IF NOT EXISTS ods_item_info(
      `id`                            BIGINT COMMENT '',
      `cate_id`                       BIGINT COMMENT '',
      `cate_name`                     STRING COMMENT '',
      `commodity_id`                  BIGINT COMMENT '',
      `commodity_name`                STRING COMMENT '',
      `desc_path`                     STRING COMMENT '',
      `duration`                      BIGINT COMMENT '',
      `features`                      STRING COMMENT '',
      `gmt_create`                    DATETIME COMMENT '',
      `gmt_modified`                  DATETIME COMMENT '',
      `is_deleted`                    BIGINT COMMENT '',
      `is_virtual`                    STRING COMMENT '',
      `item_id`                       BIGINT COMMENT '',
      `item_status`                   BIGINT COMMENT '',
      `last_offline_time`             DATETIME COMMENT '',
      `last_online_quantity`          BIGINT COMMENT '',
      `last_online_time`              DATETIME COMMENT '',
      `pict_url`                      STRING COMMENT '',
      `reserve_price`                 DECIMAL(38,18) COMMENT '',
      `secure_trade_ems_post_fee`     DECIMAL(38,18) COMMENT '',
      `secure_trade_fast_post_fee`    DECIMAL(38,18) COMMENT '',
      `secure_trade_ordinary_post_fee`DECIMAL(38,18) COMMENT '',
      `shop_id`                       BIGINT COMMENT '',
      `shop_nick`                     STRING COMMENT '',
      `sub_title`                     STRING COMMENT '',
      `title`                         STRING COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      分区信息

      本教程填入${bizdate},用于在后续测试阶段为bizdate参数赋值常量,在调度执行时为bizdate参数动态赋值。Data Studio支持的变量格式及配置方法,请参见调度参数

    2. 确认字段映射通道控制

      DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时支持设置任务并发数、脏数据策略等。本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务

  4. 在节点工具栏单击保存

配置订单数据同步链路(ods_trade_order
  1. 在工作流编辑页面中,鼠标悬停至ods_trade_order节点上,单击打开节点

  2. 配置同步链路网络与资源。

    参数

    描述

    数据来源

    • 数据来源:MySQL

    • 数据源名称:MySQL_Source

    我的资源组

    选择创建资源组并绑定工作空间时购买的Serverless资源组。

    数据去向

  3. 单击下一步,配置同步任务。

    1. 配置数据来源与去向

      说明

      表中未说明的参数保持默认值即可。

      配置区域

      参数

      配置说明

      数据来源

      trade_order

      数据去向

      单击一键生成目标表结构快速创建MaxCompute表。将如下建表语句粘贴至建表语句区域,单击新建表。该表用于接收数据来源端的商品信息。

      建表SQL

      CREATE TABLE IF NOT EXISTS ods_trade_order(
      `id`                            BIGINT COMMENT '',
      `biz_type`                      BIGINT COMMENT '',
      `buy_amount`                    BIGINT COMMENT '',
      `buyer_id`                      BIGINT COMMENT '',
      `buyer_memo`                    STRING COMMENT '',
      `buyer_nick`                    STRING COMMENT '',
      `end_time`                      DATETIME COMMENT '',
      `gmt_create`                    DATETIME COMMENT '',
      `gmt_modified`                  DATETIME COMMENT '',
      `ip`                            STRING COMMENT '',
      `is_parent`                     BIGINT COMMENT '',
      `is_sub`                        BIGINT COMMENT '',
      `item_id`                       BIGINT COMMENT '',
      `item_price`                    DECIMAL(38,18) COMMENT '',
      `logistics_status`              BIGINT COMMENT '',
      `memo`                          STRING COMMENT '',
      `parent_order_id`               BIGINT COMMENT '',
      `pay_status`                    BIGINT COMMENT '',
      `pay_time`                      DATETIME COMMENT '',
      `seller_memo`                   STRING COMMENT '',
      `shop_id`                       BIGINT COMMENT '',
      `status`                        BIGINT COMMENT '',
      `sub_order_id`                  BIGINT COMMENT '',
      `total_fee`                     DECIMAL(38,18) COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      分区信息

      本教程填入${bizdate},用于在后续测试阶段为bizdate参数赋值常量,在调度执行时为bizdate参数动态赋值。Data Studio支持的变量格式及配置方法,请参见调度参数

    2. 确认字段映射通道控制

      DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时支持设置任务并发数、脏数据策略等。本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务

  4. 在节点工具栏单击保存

步骤二:数据清洗

数据从MySQL同步至MaxCompute后,获得两张数据表(商品信息表ods_item_info和订单信息表ods_trade_order),您可以在DataWorks的数据开发模块对表中数据进行清洗、处理和分析,从而获取每日最畅销商品类目排名表。

搭建数据加工链路

  1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流,单击进入工作流编辑页。

  2. 单击编辑,在工作流编辑页面,从左侧拖拽MaxCompute SQL节点至画布中,分别设置节点名称。

    本教程节点名称示例及作用如下:

    节点类型

    节点名称

    节点作用

    节点类型

    节点名称

    节点作用

    imageMaxCompute SQL

    dim_item_info

    基于ods_item_info表,处理商品维度数据,产出商品基础信息维度表dim_item_info

    imageMaxCompute SQL

    dwd_trade_order

    基于ods_trade_order表,对订单的详细交易数据进行初步清洗、转换和业务逻辑处理,产出交易下单明细事实表dwd_trade_order

    imageMaxCompute SQL

    dws_daily_category_sales

    基于dwd_trade_order表和dim_item_info表,对DWD层经过清洗和标准化的明细数据进行汇总,产出每日商品类目销售汇总表dws_daily_category_sales

    imageMaxCompute SQL

    ads_top_selling_categories

    基于dws_daily_category_sales表,产出每日最畅销商品类目排名表ads_top_selling_categories

  3. 手动拖拽连线,配置各节点的上游节点。最终效果如下:

    image
    说明

    工作流中支持通过手动连线方式设置各节点的上下游依赖关系,也支持在子节点中,使用代码解析自动识别节点上下游依赖关系。本教程采用手动连线方式,代码解析的更多信息,请参见自动解析依赖

  4. 在节点工具栏单击保存

配置数据加工节点

配置dim_item_info节点

基于ods_item_info表,处理商品维度数据,产出商品基础信息维度表dim_item_info

  1. 在工作流编辑页面中,鼠标悬停至dim_item_info节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    CREATE TABLE IF NOT EXISTS dim_item_info (
        gmt_modified                   STRING COMMENT '商品最后修改日期',
        gmt_create                     STRING COMMENT '商品创建时间',
        item_id                        BIGINT COMMENT '商品数字ID',
        title                          STRING COMMENT '商品标题',
        sub_title                      STRING COMMENT '商品子标题',
        pict_url                       STRING COMMENT '主图URL',
        desc_path                      STRING COMMENT '商品描述的路径',
        item_status                    BIGINT COMMENT '商品状态1:确认通过0:未确认通过',
        last_online_time               DATETIME COMMENT '最近一次开始销售时间,商品上架时间',
        last_offline_time              DATETIME COMMENT '销售结束时间,表示一个销售周期的结束,仅作用于拍卖商品',
        duration                       BIGINT COMMENT '有效期,销售周期,只有两个值,7天或14天',
        reserve_price                  DOUBLE COMMENT '当前价格',
        secure_trade_ordinary_post_fee DOUBLE COMMENT '平邮费用',
        secure_trade_fast_post_fee     DOUBLE COMMENT '快递费用',
        secure_trade_ems_post_fee      DOUBLE COMMENT 'EMS邮费',
        last_online_quantity           BIGINT COMMENT '商品最近一次上架时的库存数量',
        features                       STRING COMMENT '商品特征',
        cate_id                        BIGINT COMMENT '商品叶子类目ID',
        cate_name                      STRING COMMENT '商品叶子类目名称',
        commodity_id                   BIGINT COMMENT '品类ID',
        commodity_name                 STRING COMMENT '品类名称',
        is_virtual                     STRING COMMENT '是否虚拟商品',
        shop_id                        BIGINT COMMENT '商家ID',
        shop_nick                      STRING COMMENT '商家NICK',
        is_deleted                     BIGINT COMMENT '类目是否删除'
    )
    COMMENT '商品基础信息维度表'
    PARTITIONED BY (pt STRING COMMENT '业务日期, yyyymmdd')
    LIFECYCLE 365;
    
    
    -- 插入数据到 dim_item_info 表
    INSERT OVERWRITE TABLE dim_item_info PARTITION(pt='${bizdate}')
    SELECT
        gmt_create,
        gmt_modified,
        item_id,
        title,
        sub_title,
        pict_url,
        desc_path,
        item_status,
        last_online_time,
        last_offline_time,
        duration,
        cast(reserve_price as DOUBLE),
        cast(secure_trade_ordinary_post_fee as DOUBLE),
        cast(secure_trade_fast_post_fee as DOUBLE),
        cast(secure_trade_ems_post_fee as DOUBLE),
        last_online_quantity,
        features,
        cate_id,
        cate_name,
        commodity_id,
        commodity_name,
        is_virtual,
        shop_id,
        shop_nick,
        is_deleted
    FROM ods_item_info
    WHERE pt = '${bizdate}';
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置

  4. 在节点工具栏单击保存

配置dwd_trad_order节点

基于ods_trade_order表,对订单的详细交易数据进行初步清洗、转换和业务逻辑处理,产出交易下单明细事实表dwd_trade_order

  1. 在工作流编辑页面中,鼠标悬停至dwd_trade_order节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    CREATE TABLE IF NOT EXISTS dwd_trade_order (
        id               BIGINT COMMENT '主键,去重后的最新id',
        gmt_create       DATETIME COMMENT '创建时间',
        gmt_modified     DATETIME COMMENT '修改时间',
        sub_order_id     BIGINT COMMENT '子订单ID',
        parent_order_id  BIGINT COMMENT '父订单ID',
        buyer_id         BIGINT COMMENT '买家数字id',
        buyer_nick       STRING COMMENT '买家昵称,处理空值',
        item_id          BIGINT COMMENT '商品数字id',
        item_price       DECIMAL(38,18) COMMENT '商品价格,单位分',
        buy_amount       BIGINT COMMENT '购买数量',
        biz_type         BIGINT COMMENT '交易类型',
        memo             STRING COMMENT '备注,处理空值',
        pay_status       BIGINT COMMENT '支付状态',
        logistics_status BIGINT COMMENT '物流状态',
        status           BIGINT COMMENT '状态',
        seller_memo      STRING COMMENT '卖家的给交易的备注',
        buyer_memo       STRING COMMENT '买家给交易的备注',
        clean_ip         STRING COMMENT '清洗后的买家IP,过滤无效格式',
        end_time         DATETIME COMMENT '交易结束时间',
        pay_time         DATETIME COMMENT '付款的时间',
        is_sub           BIGINT COMMENT '是否是子订单1表示子订单',
        is_parent        BIGINT COMMENT '是否是父订单1表示父订单',
        shop_id          BIGINT COMMENT '商家id',
        total_fee        DECIMAL(38,18) COMMENT '去除折扣和调整后的子订单费用',
        is_large_order_flag BOOLEAN COMMENT '是否为大额订单标志'
    )
    COMMENT '交易下单明细事实表,包含初步清洗和业务逻辑处理'
    PARTITIONED BY (pt STRING COMMENT '业务日期, yyyymmdd')
    LIFECYCLE 365; -- 数据生命周期设置为365天
    
    
    INSERT OVERWRITE TABLE dwd_trade_order PARTITION(pt='${bizdate}')
    SELECT
        MAX(id) AS id, -- 假设使用最新的id作为去重标准
        gmt_create,
        gmt_modified,
        sub_order_id,
        parent_order_id,
        buyer_id,
        COALESCE(buyer_nick, '') AS buyer_nick, -- 处理buyer_nick为空的情况
        item_id,
        item_price,
        buy_amount,
        biz_type,
        COALESCE(memo, '') AS memo, -- 处理memo为空的情况
        pay_status,
        logistics_status,
        status,
        seller_memo,
        buyer_memo,
        CASE 
            WHEN ip LIKE '__.__.__.__' THEN NULL -- 过滤无效IP格式
            ELSE ip 
        END AS clean_ip,
        end_time,
        pay_time,
        is_sub,
        is_parent,
        shop_id,
        total_fee,
        CASE 
            WHEN total_fee >= 10000 THEN TRUE -- 假设大于10000分的订单为大额订单
            ELSE FALSE 
        END AS is_large_order_flag -- 添加业务逻辑标志
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY buyer_id, item_id, gmt_create ORDER BY id DESC) AS rn -- 用于去重的行号
        FROM ods_trade_order
        WHERE pt = '${bizdate}'
    ) AS sub_query
    WHERE rn = 1 -- 仅保留每个去重组的第一条记录
    GROUP BY 
        gmt_create,
        gmt_modified,
        sub_order_id,
        parent_order_id,
        buyer_id,
        buyer_nick,
        item_id,
        item_price,
        buy_amount,
        biz_type,
        memo,
        pay_status,
        logistics_status,
        status,
        seller_memo,
        buyer_memo,
        clean_ip,
        end_time,
        pay_time,
        is_sub,
        is_parent,
        shop_id,
        total_fee,
        is_large_order_flag;
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置

  4. 在节点工具栏单击保存

配置dws_daily_category_sales节点

基于dwd_trade_order表和dim_item_info表,对DWD层经过清洗和标准化的明细数据进行汇总,产出每日商品类目销售汇总表dws_daily_category_sales

  1. 在工作流编辑页面中,鼠标悬停至dws_daily_category_sales节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    CREATE TABLE IF NOT EXISTS dws_daily_category_sales (
        cate_id             BIGINT COMMENT '商品叶子类目ID',
        cate_name           STRING COMMENT '商品叶子类目名称',
        total_sales_amount  DECIMAL(38,18) COMMENT '商品类目总销售额,单位分',
        order_count         BIGINT COMMENT '订单数量'
    )
    COMMENT '每日商品类目销售汇总表'
    PARTITIONED BY (pt STRING COMMENT '业务日期, yyyymmdd')
    LIFECYCLE 365;
    
    
    INSERT OVERWRITE TABLE dws_daily_category_sales PARTITION(pt='${bizdate}')
    SELECT
        i.cate_id,
        i.cate_name,
        SUM(t.total_fee) AS total_sales_amount,
        COUNT(DISTINCT t.id) AS order_count
    FROM dwd_trade_order t
    JOIN dim_item_info i ON t.item_id = i.item_id AND t.pt = i.pt
    WHERE t.pt = '${bizdate}'
    GROUP BY t.pt, i.cate_id, i.cate_name;
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置

  4. 在节点工具栏单击保存

配置ads_top_selling_categories节点

基于dws_daily_category_sales表,产出每日最畅销商品类目排名表ads_top_selling_categories

  1. 在工作流编辑页面中,鼠标悬停至ads_top_selling_categories节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    CREATE TABLE IF NOT EXISTS ads_top_selling_categories (
        rank                BIGINT COMMENT '销量排名',
        cate_id             BIGINT COMMENT '商品叶子类目ID',
        cate_name           STRING COMMENT '商品叶子类目名称',
        total_sales_amount  DECIMAL(38,18) COMMENT '商品类目总销售额,单位分',
        order_count         BIGINT COMMENT '订单数量'
    )
    COMMENT '每日最畅销商品类目排名表'
    PARTITIONED BY (pt STRING COMMENT '业务日期, yyyymmdd');
    
    
    INSERT OVERWRITE TABLE ads_top_selling_categories PARTITION(pt='${bizdate}')
    SELECT
        rank,
        cate_id,
        cate_name,
        total_sales_amount,
        order_count
    FROM (
        SELECT
            DENSE_RANK() OVER(ORDER BY total_sales_amount DESC) AS rank,
            cate_id,
            cate_name,
            total_sales_amount,
            order_count
        FROM (
            SELECT
                cate_id,
                cate_name,
                SUM(total_sales_amount) AS total_sales_amount,
                SUM(order_count) AS order_count
            FROM dws_daily_category_sales
            WHERE pt = '${bizdate}'
            GROUP BY cate_id, cate_name
        ) agg_sub
    ) agg_outer
    WHERE rank <= 10;
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置

  4. 在节点工具栏单击保存

步骤三:调试运行

工作流配置完成后,在发布到生产环境前,您需要运行整个工作流,验证工作流的配置是否正确。

  1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流。

  2. 单击节点工具栏的运行,填写本次运行值为当前日期的前一天(例如20250416)。

    说明

    在工作流节点配置中,已使用了DataWorks提供的调度参数,实现了代码动态入参,调试运行时需为该参数赋值常量进行测试。

  3. 单击确定,进入调试运行页面。

  4. 等待运行完成,预期运行结果如下:

    image

步骤四:数据查询与展示

您已经将从MySQL中获取的原始测试数据,经过数据开发处理,汇总于表ads_top_selling_categories中,现在可查询表数据,查看数据分析后的结果。

  1. 单击左上角image图标,在弹出页面中单击全部产品 > 数据分析 > SQL查询

  2. 在我的文件后单击image > 新建文件,自定义文件名后单击确定

  3. SQL查询页面,配置如下SQL。

    SELECT * FROM ads_top_selling_categories WHERE pt=${bizdate};
  4. 在右上角选择MaxCompute数据源后单击确定

  5. 单击顶部的运行按钮,在成本预估页面,单击运行

  6. 在查询结果中单击image,查看可视化图表结果,您可以单击图表右上角的image自定义图表样式。自定义图表样式的更多信息,请参见增强分析(卡片和报告)

    image

  7. 您也可以单击图表右上角保存,将图表保存为卡片,然后在左侧导航栏单击卡片image)查看。

步骤五:周期性调度

通过完成前文操作步骤,您已经获取了前一天各类商品的销售数据,但是,如果需要每天获取最新的销售数据,则可以将工作流发布至生产环境,使其周期性定时执行。

说明

在配置数据同步和数据加工时,已同步为工作流、同步节点以及数据加工节点配置了调度相关参数,此时无需再配置,只需将工作流发布到生产环境即可。调度配置的更多详细信息,请参见节点调度配置

  1. 单击左上角image图标,在弹出页面中单击全部产品 > 数据开发与运维 > DataStudio(数据开发)

  2. Data Studio左侧导航栏单击image,进入数据开发页面,切换至本案例使用的项目空间,然后在项目目录区域找到已创建好的工作流。

  3. 单击节点工具栏的发布,在发布面板中单击开始发布生产,等待发布包构建生产检查器完成后,单击确认发布

  4. 发布到生产环境状态为已完成后,单击去运维,前往运维中心。

    image

  5. 周期任务运维 > 周期任务中即可看到工作流的周期任务(本教程工作流命名为dw_quickstart)。

  6. 如需查看工作流内子节点的周期任务详情,请右键工作流的周期任务,选择查看内部任务。

    image

    预期结果如下:

    image

下一步

附录:资源释放与清理

如果您需要释放本次教程所创建的资源,具体操作步骤如下:

  1. 停止周期任务。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心

    2. 周期任务运维 > 周期任务中,勾选所有之前创建的周期任务(工作空间root节点无需下线),然后在底部单击操作 > 下线节点

  2. 删除数据开发节点并解绑MaxCompute计算资源。

    1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

    2. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流,右键工作流,单击删除

    3. 在左侧导航栏,单击image > 计算资源管理,找到已绑定的MaxCompute计算资源,单击解绑。在确认窗口中勾选选项后按照指引完成解绑。

  3. 删除MySQL数据源。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心

    2. 在左侧导航栏单击数据源,进入数据源列表页,找到已创建的MySQL数据源,单击操作列的删除,按照指引完成删除。

  4. 删除MaxCompute项目。

    前往MaxCompute项目管理页面,找到已创建的MaxCompute项目,单击操作列的删除,按照指引完成删除。

  5. 删除公网NAT网关并释放弹性公网IP。

    1. 前往专有网络-公网NAT网关控制台,在顶部菜单栏切换至华东2(上海)地域。

    2. 找到已创建的公网NAT网关,单击操作列的image > 删除,在确认窗口中,勾选强制删除,然后单击确定

    3. 在左侧导航栏单击公网访问 > 弹性公网IP,找到已创建的弹性公网IP,单击操作列中image > 实例管理 > 释放,在确认窗口中单击确定

  • 本页导读 (1)
  • 入门简介
  • 前提条件
  • 准备工作
  • 开通DataWorks
  • 创建工作空间
  • 创建资源组并绑定工作空间
  • 为资源组开通公网
  • 创建并绑定MaxCompute计算资源
  • 操作步骤
  • 步骤一:数据同步
  • 步骤二:数据清洗
  • 步骤三:调试运行
  • 步骤四:数据查询与展示
  • 步骤五:周期性调度
  • 下一步
  • 附录:资源释放与清理
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等