文档

使用do-while节点实现复杂的数据分析

更新时间:

当您需要对一组数据或任务进行多次处理,直到满足某个条件才停止时,您可以使用do-while节点来自动化重复执行,简化复杂任务的操作步骤,以提高任务的执行效率和可靠性。本文以统计电商行业中的订单数据为示例,为您介绍如何使用do-while节点。

背景信息

DataWorks为您提供循环节点(do-while节点),您可以编排do-while节点内部的业务流程,将需要循环执行的逻辑写在节点内,再编辑end循环判断节点来控制是否退出循环。同时您也可以结合赋值节点来循环遍历赋值节点传递的结果集。本文通过简单的订单统计示例,为您介绍如何使用do-while节点。

本文示例:

  • 原始数据

    在电商行业中,由于每天的订单量可能很大,因此在DataWorks中已创建了一个按天(下单时间的年月日)分区的订单表。即:每天创建的订单放在同一个分区,分区字段值为下单时间的年月日缩写。例如:分区20220901中存放所有2022年9月1日创建的订单数据。

    | id | user_id | order_amount | ds       |
    |----|---------|--------------|----------|
    | 1  | 1001    | 500          | 20220901 |
    | 2  | 1002    | 1500         | 20220901 |
    | 7  | 1003    | 890          | 20221021 |
    | 8  | 1004    | 240          | 20221021 |
  • 数据需求

    现需统计2022年以每个月第1天为基准的近1月(30天)近2月(60天)近3月(90天)的订单金额及订单数量。例如:stat_day=20220901 stat_type=30d代表2022年9月1日的近1月(30天)的统计数据。

    | stat_day | stat_type | order_total | order_amount_total |
    |----------|-----------|-------------|--------------------|
    | 20220901 | 30d       | 10          | 0                  |
    | 20220901 | 60d       | 20          | 0                  |
    | 20220901 | 90d       | 30          | 0                  |
  • 需求分析

    • 最终需要产出12条数据(2022年的12个月份)。

    • 每条数据在统计时,需要查询最近1月(30天)近2月(60天)近3月(90天)的所有分区。这时所查的分区范围是根据每月第1天动态计算出来而不是固定的。

    • 综上所述,我们可以通过使用do-while循环节点进行12次循环来方便的计算每月的数据,同时使用3个SQL节点分别计算近1月(30天)近2月(60天)近3月(90天)的数据。通过使用${dag.loopTimes}来代表当前循环到第几个月,从而在SQL节点中计算出当月第1天的分区及前1月第1天、前2月第1天、前3月第1天的分区。最终实现此数据统计需求。

前提条件

数据准备

  1. 登录MaxCompute控制台,在左侧导航栏单击数据开发,进入数据开发页面。

  2. 在目标业务流程下的MaxCompute > 数据开发下新建ODPS SQL节点。节点命名如:init_test_data

    image.png

  3. init_test_data节点中输出以下SQL脚本。

    -- 创建订单表
    CREATE TABLE orders
    (
        id            BIGINT
        ,user_id      BIGINT -- 用户id
        ,order_amount BIGINT -- 订单金额
    )
    PARTITIONED BY 
    (
        ds            STRING -- 下单时间年月日
    )
    ;
    
    --插入订单数据
    INSERT INTO orders PARTITION (ds = '20220901') VALUES (1,1001,500) ,(2,1002,1500);
    INSERT INTO orders PARTITION (ds = '20220905') VALUES (3,1005,260) ,(4,1002,780);
    INSERT INTO orders PARTITION (ds = '20221010') VALUES (5,1003,890) ,(6,1004,240);
    INSERT INTO orders PARTITION (ds = '20221021') VALUES (7,1003,890) ,(8,1004,240);
    INSERT INTO orders PARTITION (ds = '20221025') VALUES (9,1002,260) ,(10,1007,780);
    
    -- 创建订单统计表
    CREATE TABLE orders_stat
    (
        stat_day            STRING -- 统计日期,每月第1天
        ,stat_type          STRING -- 类型
        ,order_total        BIGINT -- 总订单数
        ,order_amount_total BIGINT -- 订单金额
    )
    ;
  4. 提交init_test_data节点到生产环境并执行补数据。

    1. 单击当前节点工具栏中的保存提交图标,保存并提交节点。提交节点时,请根据提示输入变更描述,并根据需要选择是否进行代码评审及冒烟测试。

      说明
      • 您需在调度配置中设置节点的重跑属性依赖的上游节点,才可以提交节点。

      • 开启代码评审后,开发人员提交的节点代码必须通过评审人员的审核才可发布,详情请参见代码评审

      • 为保障调度节点任务执行符合预期,建议您在发布前对任务进行冒烟测试,详情请参见冒烟测试

    2. 如果您使用的是标准模式的工作空间,提交成功后,需单击右上方的发布,发布节点。相关介绍请参见标准模式的工作空间发布任务

    3. 单击当前节点工具栏右侧的运维,进入运维中心,单击左侧菜单周期任务运维 > 周期任务,进入周期任务页面。在中间的周期任务列表中单击init_test_data,在右侧单击init_test_data并右键单击,在弹出的菜单中单击测试,执行任务测试,以初始化数据。

      image.png

  5. 冻结节点。

    说明

    由于本步骤是执行一次性的数据初始化,因为当您成功执行一次本节点后,可将本节点进行暂停(冻结),避免按周期持续执行。

    在节点任务列表中选中本节点,单击下方的操作按钮,在弹出的菜单中单击暂停(冻结)即可。

    image.png

操作步骤

新建do-while节点

重要

请确保当前工作空间已绑定MaxCompute引擎,否则无法继续如下操作。详情请参见绑定MaxCompute引擎

  1. 进入数据开发页面。

    登录DataWorks控制台,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 创建do-while节点。

    1. 数据开发页面,鼠标悬停至新建图标,单击新建节点 > 通用 > do-while

      您也可以打开相应的业务流程,右键单击通用,选择新建节点 > do-while

    2. 新建节点对话框中,输入节点名称,并选择路径

    3. 单击确认

添加Shell节点和ODPS SQL节点

  1. 双击do-while节点名称,进入内部节点页面。

  2. 删除默认的sql节点。右键单击处于do-while节点中间的sql节点,单击删除节点。在删除对话框中,单击确定

  3. 分别单击左侧MaxCompute > ODPS SQL通用 > Shell,新建3个ODPS SQL节点和一个Shell节点。

    image.png

    1. 3个ODPS SQL节点命名为:30_day60_day90_day

    2. 1个Shell节点命名为:echo

  4. 设置ODPS SQL节点和Shell节点的上下游关系。通过拖拽连线,设置echo节点的上游为start节点,30_day60_day90_day的上游为echo节点,下游为end节点。

    image.png

设置节点代码

重要
  • ${dag.loopTimes}变量是系统的保留变量,代表当前的循环次数,从1开始,do-while的内部节点可以直接引用该变量。更多内置变量请参见 内置变量取值案例

  • Shell节点中的代码修改后请务必保存,提交时不会进行提示。如果未保存,最新的代码不能及时更新。

  1. 设置echo节点代码。此节点用于输出当前循环次数日志。双击echo节点,进入Shell节点的编辑页面,输入以下代码。

    #!/bin/bash
    echo "loop times: ${dag.loopTimes}"
  2. 设置30_day节点代码。此节点用于统计2022年每个月第1天的近1个月订单数据。双击30_day节点,进入ODPS SQL节点的编辑页面,输入以下代码。

    INSERT INTO orders_stat
    SELECT  CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01') AS stat_day
            ,'30d' AS stat_type
            ,COUNT(id) AS order_total
            ,nvl(SUM(order_amount),0) AS order_amount_total
    FROM    orders
    WHERE   
    -- 上个月第1天
    ds >= REPLACE(ADD_MONTHS(TO_DATE(CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01'),'yyyyMMdd'),-1),'-','') 
    -- 当月第1天
    AND ds < CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01')
    ;

    以上代码中使用到的DML及函数如下:

  3. 设置60_day节点。此节点用于统计2022年每个月第1天的近2个月订单数据。

    INSERT INTO orders_stat
    SELECT  CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01') AS stat_day
            ,'60d' AS stat_type
            ,COUNT(id) AS order_total
            ,nvl(SUM(order_amount),0) AS order_amount_total
    FROM    orders
    WHERE   
    -- 2个月前第1天
    ds >= REPLACE(ADD_MONTHS(TO_DATE(CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01'),'yyyyMMdd'),-2),'-','') 
    -- 当月第1天
    AND ds < CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01')
    ;
  4. 设置90_day节点。此节点用于统计2022年每个月第1天的近3个月订单数据。

    INSERT INTO orders_stat
    SELECT  CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01') AS stat_day
            ,'90d' AS stat_type
            ,COUNT(id) AS order_total
            ,nvl(SUM(order_amount),0) AS order_amount_total
    FROM    orders
    WHERE   
    -- 3个月前第1天
    ds >= REPLACE(ADD_MONTHS(TO_DATE(CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01'),'yyyyMMdd'),-3),'-','') 
    -- 当月第1天
    AND ds < CONCAT('2022',LPAD(${dag.loopTimes},2,'0'),'01')
    ;
  5. 设置end节点。此节点用于控制何时退出循环。

    1. 双击打开end节点的编辑页面。

    2. 请选择赋值语言下拉列表中,选中Python

    3. 输入以下代码,定义do-while节点的结束条件。

      重要
      • do-while节点循环次数上限为128次,即${dag.loopTimes}的最大值为128。

      • do-while节点不支持并发执行。即上次循环完成后才可进入下一次循环。

      if ${dag.loopTimes} < 12:
          print True
      else:
          print False

      ${dag.loopTimes} < 12代表仅循环12次,1次循环统计1个月,即总共统计2022年的12个月的数据。

提交节点

双击do-while节点,在右侧的节点页面进行如下操作:

重要

您需要设置节点的重跑属性依赖的上游节点,才可以提交节点。

  1. 单击工具栏中的提交图标。

  2. 提交对话框中,选中需要提交的节点,输入备注

  3. 单击提交

    如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的发布。具体操作请参见发布任务

测试节点

do-while节点提交发布流程与普通节点一致,线上执行流程与普通节点一致,但不支持数据开发界面测试。

说明

DataWorks为标准模式时,不支持在DataStudio界面直接测试运行do-while节点。

如果您想测试验证do-while节点的运行结果,您需要将包含do-while节点的任务发布提交到运维中心,在运维中心页面运行do-while节点任务。如果您在do-while节点内使用了赋值节点传递的值,请在运维中心测试时,同时运行赋值节点和循环节点。

  1. 进入周期任务页面执行补数据。

    1. 单击页面右上方的运维,进入运维中心

    2. 在左侧导航栏,单击周期任务运维 > 周期任务

    3. 选中相应的节点,在右侧的DAG图中,右键单击do_while_test节点,在弹出菜单中单击补数据 > 当前节点

      image.png

      由于本示例中的SQL脚本逻辑不依赖业务日期,因此可直接使用默认的业务日期执行。

      image.png

    4. 进入补数据实例页面,单击任务名称do_while_test,再右键单击右侧DAG图中的任务名称,在弹出的菜单中单击查看运行日志

      image.png

  2. 查看循环节点执行日志。

    1. 在补数据实例页面,在右侧DAG图中的任务名称上右键单击,在弹出的菜单中单击查看内部节点

      循环节点这类组合节点需要查看内部节点才能看到具体执行日志。

      image.png

      do-while节点的内部循环体分以下三部分:

      • 视图左侧为do-while节点的重跑历史列表,只要do-while实例整体运行一次,历史列表便会产生一条相应的记录。

      • 视图中部为循环记录列表,会列出当前do-while节点共运行多少次循环,以及每次循环的状态。

      • 视图右侧为每次循环的具体信息,单击循环记录列表中的某次循环,即可展示出该循环每个实例的运行情况。

    2. 在内部节点页面,单击左侧的次数,并右键单击相应节点,选中查看运行日志

  3. 查看第N次循环的详细执行日志。

    在内部节点页面,单击左侧的第3次,查看第3次循环echo节点的日志。

    image.png

    由该示例可见,do-while节点的工作流程如下:

    1. 从start节点开始运行。

    2. 按照定义的任务依赖关系依次运行每个任务。

    3. 在end节点中定义循环的结束条件。

    4. 一组任务运行完毕之后,运行end的结束条件语句。

    5. 如果end的判断语句在日志中打印True,则从1开始继续下一个循环。

    6. 如果end的判断语句在日志中打印False,则退出整个循环,do-while节点整体结束。

  4. 查看结果表数据。

    新建ODPS SQL节点,执行如下SQL:

    SELECT * FROM orders_stat;

    输出:

    | stat_day | stat_type | order_total | order_amount_total |
    |----------|-----------|-------------|--------------------|
    | 20220101 | 60d       | 0           | 0                  |
    | 20220101 | 30d       | 0           | 0                  |
    | 20220101 | 90d       | 0           | 0                  |
    | 20220201 | 60d       | 0           | 0                  |
    | 20220201 | 30d       | 0           | 0                  |
    | 20220201 | 90d       | 0           | 0                  |
    | 20220301 | 30d       | 0           | 0                  |
    | 20220301 | 60d       | 0           | 0                  |
    | 20220301 | 90d       | 0           | 0                  |
    | 20220401 | 30d       | 0           | 0                  |
    | 20220401 | 90d       | 0           | 0                  |
    | 20220401 | 60d       | 0           | 0                  |
    | 20220501 | 30d       | 0           | 0                  |
    | 20220501 | 90d       | 0           | 0                  |
    | 20220501 | 60d       | 0           | 0                  |
    | 20220601 | 60d       | 0           | 0                  |
    | 20220601 | 30d       | 0           | 0                  |
    | 20220601 | 90d       | 0           | 0                  |
    | 20220701 | 90d       | 0           | 0                  |
    | 20220701 | 30d       | 0           | 0                  |
    | 20220701 | 60d       | 0           | 0                  |
    | 20220801 | 60d       | 0           | 0                  |
    | 20220801 | 90d       | 0           | 0                  |
    | 20220801 | 30d       | 0           | 0                  |
    | 20220901 | 90d       | 0           | 0                  |
    | 20220901 | 60d       | 0           | 0                  |
    | 20220901 | 30d       | 0           | 0                  |
    | 20221001 | 90d       | 4           | 3040               |
    | 20221001 | 60d       | 4           | 3040               |
    | 20221001 | 30d       | 4           | 3040               |
    | 20221101 | 30d       | 16          | 8860               |
    | 20221101 | 90d       | 20          | 11900              |
    | 20221101 | 60d       | 20          | 11900              |
    | 20221201 | 90d       | 20          | 11900              |
    | 20221201 | 60d       | 16          | 8860               |
    | 20221201 | 30d       | 0           | 0                  |

相关文档

  • 本页导读 (1)
文档反馈