for-each节点

在数据处理工作流中,当需要对一个列表(如文件名列表、分区列表)的每一项执行相同子任务时,可使用 for-each 节点。该节点能自动遍历上游节点(通常为赋值节点)输出的结果集,并为其中的每个元素重复执行内部循环体。该节点可避免为每一项手动创建任务的繁琐工作,实现工作流的动态化与自动化。

使用场景

在日常数据开发中,当需要对不同的业务单元、产品线或配置项执行相同的分析或处理逻辑时,for-each节点可以实现参数化执行。例如,当公司有多个产品线,需要为每个产品线单独生成一份日报,此时处理逻辑完全相同,只是处理对象不同。

for-each节点能像编程语言中的 for 循环一样,自动遍历一个列表(如表名、分区名、文件名等),并为列表中的每一项重复执行您预设好的子任务流,从而极大地提升工作流的自动化和灵活性。

适用范围

  • 版本限制:仅支持DataWorks标准版及以上版本。

  • 权限限制:RAM账号已被添加至对应工作空间中,并具有开发空间管理员角色权限。详情请参见为工作空间添加空间成员

工作原理

for-each 节点如同一个容器,其内部封装一个可以自定义的子工作流(循环体)。其工作机制如下:

image
  1. 数据输入:for-each 节点依赖一个上游赋值节点/其他能赋值的节点(例如,EMR Hive等),通过绑定 loopDataArray 参数,获取其输出的数组形式的结果集

  2. 循环执行:节点启动后,会按顺序遍历结果集中的每一个元素。对于每一个元素,它都会完整地执行一次内部循环体(从 开始结束)。

    说明

    开始节点和结束节点不可编辑,仅用于标记整个循环体的开始和结束。

  3. 数据传递:在每次循环中,当前被遍历的元素值会通过内置变量传递给循环体内的节点。内部的业务节点通过 ${dag.foreach.current} 获取当前正在处理的数据项。

内置变量

重要

${...} 形式变量均为DataWorks特有的模板语法,DataWorks将直接解析参数,并做静态替换。

在 for-each 循环体内部的节点中,使用以下内置变量来获取循环状态和数据:

内置变量

含义

与 for 循环类比

${dag.loopDataArray}

获取上游赋值节点传入的完整结果集

以下面的for循环代码为例:

for(int i=0;i<data.length;i++) {
   print(data[i]);
}
  • ${dag.loopDataArray} 相当于data

  • ${dag.foreach.current}相当于data[i]

  • ${dag.offset}相当于i

  • ${dag.loopTimes}相当于 i+1

${dag.foreach.current}

获取当前循环正在处理的数据项。

${dag.offset}

获取当前循环的偏移量,从 0 开始。

${dag.loopTimes}

获取当前是第几次循环,从 1 开始。

当上游输出为二维数组时(如 SQL 查询结果),还可使用以下方式精确取值:

其他变量

含义

${dag.foreach.current}

获取当前数据行(一维数组)基于逗号,分隔后的字符串。

${dag.foreach.current[n]}

获取当前数据行(一维数组)中第 n个数据。

${dag.loopDataArray[i][j]}

获取整个结果集中,第 i 行、第 j 列的数据。

for-each节点暂不支持双重循环,此处仅做取值示范。

注意事项

  • 执行机制:循环采用串行执行,即上一次循环结束后才会开始下一次,不支持并发执行

  • 循环上限:默认最大循环次数为 128 次,可调整的最大循环次数为1024

  • 调试限制:不支持在 Data Studio(数据开发页面)直接运行 for-each 节点。必须将任务发布后,在运维中心通过冒烟测试功能进行测试。

  • 执行限制:for-each 节点不支持单独运行,包括冒烟测试、补数据、手动运行。

  • 循环体内的流程控制:当在 for-each 循环体内使用分支节点时,必须确保所有分支最终都汇集到一个归并节点,然后再连接到 结束 节点,以保证循环体的逻辑完整性。

操作步骤

本操作以赋值节点作为上游节点,循环体中配置一个Shell节点打印结果为例,引导您完成一个完整的 for-each 任务配置:

  1. 准备上游数据(配置赋值节点)

    创建一个赋值节点并配置其输出,为下游的 for-each 节点提供一个待遍历的结果集

    1. 在工作流中,创建赋值节点(如 assign)并将其置于 for-each 节点的上游。

    2. 双击打开赋值节点,选择一种Python 2。例如,使用 Python 2 输出一个包含四个元素的数组:

      此时赋值节点将会输出[10,20,30,40]至下游。因为赋值节点会自动将最后一行输出基于逗号分割成数组形式作为输出结果。
      print "10,20,30,40"
    3. 赋值节点将自动生成一个名为 outputs 的输出参数,代表其结果集。

    4. 保存赋值节点。

  2. 配置 for-each 节点消费数据

    配置 for-each 节点接收上游数据,并在其循环体内使用这些数据。

    1. 双击 for-each 节点,进入其内部编排画布。

    2. 在右侧的调度配置面板,找到调度参数下的 loopDataArray 参数,单击绑定

      image

    3. 在弹出的对话框中,取值来源选择上游赋值节点 (assign) 的 outputs 参数进行绑定。此操作会自动建立两个节点间的依赖关系。

    4. for-each 循环体中,单击新建内部节点,选择创建一个 Shell 节点。

      实际场景中,可以配置任何节点。
    5. 双击打开新建的 Shell 节点,在代码中使用内置变量获取并打印循环信息:

      #!/bin/bash
      # 使用 ${dag.loopTimes} 获取当前循环次数
      echo "Current loop number is: ${dag.loopTimes}"
      
      # 使用 ${dag.foreach.current} 获取当前遍历到的数据
      echo "Current item is: ${dag.foreach.current}"
    6. (可选)在右侧的调度配置面板,配置调度策略下的最大循环次数参数。默认为128,最大可调至1024

      image

      重要

      该参数决定循环体的最大循环次数。当上游数据量较大时,请确保调整此参数以完成所有遍历。

    7. 保存Shell节点。

  3. 发布、运行与验证

    将工作流提交到运维中心执行,并验证 for-each 节点的运行结果。

    1. 返回主工作流画布,单击工具栏发布按钮,发布整个工作流。

    2. 前往运维中心模块的任务运维 > 周期任务运维 > 周期任务,对目标工作流进行冒烟测试

      重要

      请勿单独对 for-each 节点执行冒烟。由于 for-each 节点依赖上游赋值节点的输出,您必须从赋值节点开始测试,以确保数据链路的完整性。

    3. 等待测试实例运行成功后,在实例列表中找到 for-each 节点的实例并单击打开,右键选择查看内部节点

      image

    4. 在内部节点视图中,查看每次循环生成的 Shell 节点实例。打开任一实例的运行日志,即可查看该次循环的输出结果,验证其是否符合预期。

      image

用例:处理不同数据格式

场景1:处理一维数组(Shell/Python 输出)

  • 赋值节点输出:2025-11-01,2025-11-02,2025-11-03

  • 遍历次数:3 次。

  • 2次循环时

    • ${dag.foreach.current} 的值为 2025-11-02

    • ${dag.loopTimes} 的值为 2

场景2:处理二维数组(SQL 输出)

  • 赋值节点 (MaxCompute SQL) 输出

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • 遍历次数:2 次。

  • 2次循环时

    • ${dag.foreach.current} 的值为 102,shanghai

    • ${dag.loopTimes} 的值为 2

    • ${dag.foreach.current[0]} 的值为 102

    • ${dag.foreach.current[1]} 的值为 shanghai

场景实践:批量处理多业务线分区表数据

本示例展示如何使用赋值节点for-each 节点批量处理多个业务线的用户行为数据,实现一套处理逻辑服务多条产品线的自动化数据处理。

image

业务背景

假设您是一家综合性互联网公司的数据开发工程师,负责处理三个核心业务线的数据:电商(ecom)、金融(finance)和物流(logistics),且后续存在增加业务线的可能。您需要每天对这三个业务线的用户行为日志执行相同的聚合逻辑,计算每个用户的日活跃度(PV),并将结果存入统一的汇总表。

  • 上游源表(DWD层):

    • dwd_user_behavior_ecom_d:电商用户行为表。

    • dwd_user_behavior_finance_d :金融用户行为表。

    • dwd_user_behavior_logistics_d :物流用户行为表。

    • dwd_user_behavior_${业务线}_d :后续更多可能的业务线用户行为表。

    • 这些表结构相同,都按天分区(dt)。

  • 下游目标表(DWS):

    • dws_user_summary_d :用户汇总表。

    • 该表按业务线(biz_line)和天(dt)双重分区,用于统一存储所有业务线的聚合结果。

若为每个业务线创建独立的任务,维护成本高且容易出错。使用 for-each 节点后,只需维护一份处理逻辑,系统会自动遍历所有业务线完成计算。

数据准备

首先,创建示例表并插入测试数据(以业务日期 20251010 为例):

  1. 为工作空间绑定MaxCompute计算资源

  2. 进入Data Studio数据开发,创建MaxCompute SQL节点。

  3. 创建源表(DWD层):在MaxCompute SQL节点添加如下代码,并选中运行。

    -- 电商用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '电商用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- 验证电商用户行为表创建成功
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- 金融用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '金融用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- 验证金融用户行为表创建成功
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- 物流用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '物流用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- 验证物流用户行为表创建成功
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 创建目标表(DWS层):在MaxCompute SQL节点上添加如下代码,并选中运行。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT '用户ID',
        pv          BIGINT COMMENT '日活跃度',
    ) 
    COMMENT '用户日活跃度汇总表'
    PARTITIONED BY (
        dt           STRING COMMENT '日期分区,格式 yyyymmdd',
        biz_line     STRING COMMENT '业务线分区,如 ecom, finance, logistics'
    );
    重要

    若工作空间使用标准环境,需要将此节点发布至生产环境,并执行补数据。

工作流实现

  1. 创建一个工作流,并在右侧调度参数设置调度参数bizdate为上一天$[yyyymmdd-1]

    image

  2. 在工作流中,创建名为get_biz_list的赋值节点,并使用MaxCompute SQL语言编写如下代码。该节点输出需要处理的业务线列表:

    -- 输出所有需要处理的业务线
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. 配置 for-each 节点

    • 回到工作流界面,给赋值节点get_biz_list创建一个下游的for-each节点。

    • 进入for-each节点设置界面,在右侧调度配置调度参数 > 脚本参数,将 loopDataArray 参数绑定到get_biz_list节点的outputs

      image

    • for-each节点循环体内单击新建内部节点,创建 MaxCompute SQL节点,编写循环体内的处理逻辑。

      说明
      • 该脚本由 for-each 节点驱动,会针对每个业务线执行一次。

      • 内置变量 ${dag.foreach.current} 在每次运行时,会动态地替换为当前的业务线名称。预期的迭代值为:'ecom', 'finance', 'logistics'。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 添加验证节点

    回到工作流,在for-each节点单击新建下游,创建一个MaxCompute SQL节点并添加如下代码。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

发布和运行结果

将工作流发布至生产环境,前往运维中心的周期任务运维 > 周期任务,找到目标工作流并执行冒烟测试,选择业务日期为'20251010'

完成运行后,在测试实例中查看运行日志。最终节点预期输出为:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

方案优势

  • 高扩展性:新增业务线时,只需在赋值节点添加一行 SQL,无需修改处理逻辑。

  • 易维护:所有业务线共享同一套处理逻辑,修改一处即可生效于全部。

常见问题

  • Q:为什么无法在 Data Studio 直接运行 for-each 节点进行测试?

    A:此为设计限制。该节点需要在完整的调度环境中解析节点上下文和依赖关系,因此不支持在 Data Studio 中直接运行调试。必须将任务发布运维中心,通过补数据或周期调度的方式进行测试。

  • Q:为什么单独对for-each 节点冒烟测试会失败或不执行任何操作?

    A:for-each 节点的循环数据来源于其 loopDataArray 输入参数,该参数需绑定上游赋值节点outputs 参数。如果单独运行 for-each 节点,它将因无法获取输入结果集而跳过执行或执行失败。

  • Q:我的循环为什么只执行了一次? 

    A:这通常是因为上游赋值节点的输出结果被解析为了单个元素。请检查您的输出:

    • 1. 是否是一个不含分隔符的单一字符串?

    • 2. 如果期望遍历多项,请确保它们由英文逗号 (,) 分隔。例如,'item1,item2,item3' 会循环三次,而 'item1 item2 item3' 只会循环一次。