do-while节点

更新时间:
复制为 MD 格式

DataWorks为您提供 do-while 节点,用于实现“先执行、后判断”的循环逻辑。当您需要重复执行一系列任务,直到某个动态条件达成时(例如,轮询检查文件是否存在、分批处理数据直至源头为空),do-while 节点是理想的选择。它允许您在循环体内自由编排任务流,并通过一个独立的“结束”节点来精准控制循环的退出。

使用场景

在数据开发中,当您遇到需要根据条件重复执行的场景时,do-while 节点能极大简化您的工作流设计。典型场景包括:

  • API 轮询:循环调用一个 API,直到其返回特定状态(如 SUCCESS),或返回的数据满足要求。

  • 数据就绪等待:等待上游某个文件或数据分区生成。每次循环检查一次,文件生成后即退出循环,继续下游任务。

  • 分批数据处理:上游节点(如赋值节点)一次性提供了一个数据列表(如表名、日期分区),do-while 节点可以逐个处理,直到列表处理完毕。

  • 状态同步:反复检查某个外部服务的状态,直到状态变为“可用”或“完成”,再触发后续的数据同步或处理流程。

适用范围

  • 版本限制:仅支持DataWorks标准版及以上版本。

  • 权限限制:RAM账号已被添加至对应工作空间中,并具有开发空间管理员角色权限。详情请参见为工作空间添加空间成员

工作原理

do-while 节点像一个“循环容器”,其核心机制是 “先执行,后判断”

开始 开始循环体(执行任务)结束 结束(判断条件)

  1. 启动与执行:循环从 开始 节点开始,至少会完整执行一次循环体内的所有任务。

  2. 条件判断:循环体执行完毕后,会运行 结束 节点。您在此节点中编写判断逻辑。

  3. 决策循环

    • 如果 结束 节点最终输出字符串 True,则开启新一轮循环,返回第1步。

    • 如果 结束 节点最终输出字符串 False,则循环终止,整个 do-while 节点运行成功。

关键特性:无论条件是否成立,循环体都会至少执行一次。

节点组成

双击 do-while 节点,进入其内部编排画布,主要由以下三部分构成:

  • 开始 开始:循环的入口标记,不可编辑或删除。

  • 循环体:一个可自由编排的任务区域。可通过“新建内部节点”添加 Shell、SQL、Python 等多种类型的任务节点,形成需重复执行的业务流程。

  • 结束 结束:循环的判断节点,本质是一个赋值节点。支持 ODPS SQL、Shell、Python 编写代码,最终输出 TrueFalse 字符串以决定是否继续循环。

内置变量

在 do-while 节点的循环体和结束节点中,您可以使用以下内置变量获取当前循环状态及输入数据:

内置变量

含义

系统内置变量

${dag.loopTimes}

当前是第几次循环,从 1 开始计数。

${dag.offset}

当前循环的偏移量,从 0 开始。等价于 ${dag.loopTimes} - 1

结合赋值节点使用
(假设输入参数 input 绑定上游赋值节点输出)

${dag.input}

获取上游传入的完整结果集(通常为二维数组)。

${dag.input.length}

获取结果集的行数(即总元素数量)。

${dag.input[${dag.offset}]}

获取当前循环正在处理的那一行数据,返回格式为逗号分隔的字符串(如 "user_A,beijing")。

${dag.input[i][j]}

(仅限二维数组)获取第 i 行、第 j 列的精确值(索引从 0 开始)。

示例:${dag.input[0][1]} 可提取第一行第二列的值 "beijing"

注意事项

  • 版本限制:仅 DataWorks 标准版及以上版本支持该功能。

  • 循环上限:默认最大循环次数为 128 次,可调整的最大循环次数为 1024。超过此限制将导致任务失败。

  • 执行机制:采用串行执行模式,不支持并发处理;每次循环必须完全结束后才启动下一轮。

  • 调试限制:无法在 Data Studio 页面直接运行测试。必须先发布工作流,然后在 运维中心 使用 补数据 功能进行验证。

  • 依赖传递:若依赖上游赋值节点,请务必从上游节点开始补数据,确保数据链路完整。

  • 流程控制:若在循环体内使用 分支(Branch)节点,请确保所有分支路径最终汇聚至 归并(Merge)节点,避免流程断裂。

操作步骤:创建一个简单的循环任务

本示例将引导您创建一个循环 5 次的任务,并在每次循环中打印当前是第几次循环。

步骤一:配置循环体(添加 Shell 节点)

  1. 在您的业务流程中创建一个 do-while 节点。

  2. 双击进入节点内部,在“循环体”区域单击 “新建内部节点”,选择 Shell 节点,并命名为 print_loop_times

  3. 右键单击该节点,选择 “打开节点”

  4. 在代码编辑器中输入以下命令:

    # 使用内置变量 ${dag.loopTimes} 获取当前循环次数
    echo "This is loop number: ${dag.loopTimes}"
  5. 单击保存图标完成保存。

步骤二:定义退出条件

  1. 返回 do-while 内部画布,右键单击 结束 结束 节点,选择 “打开节点”

  2. 将语言切换为 Python

  3. 输入以下 Python 代码:

    # 当循环次数小于5时,输出True,继续循环
    # 当循环次数达到5时,输出False,退出循环
    if ${dag.loopTimes} < 5:
        print True
    else:
        print False
  4. 保存结束节点。

步骤三:发布、运行与验证

  1. 返回主工作流画布,点击工具栏的 发布 按钮,发布整个工作流。

  2. 进入 运维中心,定位到该 do-while 节点。

  3. 右键单击节点 → 补数据 > 当前节点,启动一次测试运行。

  4. 待实例运行成功后,再次右键 → “查看内部节点”

  5. 查看五次循环记录,任意展开某次循环(如第5次),右键其内部的 print_loop_times 实例 → “查看运行日志”,应可见输出:

    This is loop number: 5

进阶用法:结合赋值节点处理数据列表

这是一个常见且实用的场景:利用 do-while 遍历并处理上游赋值节点输出的数据集。场景说明:上游一个 ODPS SQL 赋值节点查询出两行用户信息,do-while 节点对每一行数据进行处理。

步骤一:配置上游赋值节点

  1. 创建一个 赋值节点(如命名 assign_sql_data),并设为 do-while 的上游。

  2. 在节点中使用 ODPS SQL 查询数据:

    SELECT 'user_A', 'beijing'
    UNION ALL
    SELECT 'user_B', 'shanghai';
  3. 保存节点。其 outputs 参数将输出一个包含两行数据的二维数组。

步骤二:配置 do-while 节点消费数据

  1. do-while 节点右侧的 “调度配置” 面板中,找到 “本节点输入参数”

  2. 点击 “添加”,配置如下参数:

    • 参数名input(可自定义)

    • 取值来源:选择 assign_sql_data.outputs

  3. do-while循环体 中新建一个 Shell 节点,输入以下脚本处理当前行:

    # 获取当前循环处理的行数据
    echo "Processing data row: ${dag.input[${dag.offset}]}"
  4. 打开 结束 结束 节点,使用 Python 设置退出条件:

    # 若已执行次数小于数据总行数,则继续循环
    if ${dag.loopTimes} < ${dag.input.length}:
        print True
    else:
        print False
  5. 发布工作流,并在运维中心 assign_sql_data 节点开始补数据,确保数据正确流入循环。

运行成功后,可在日志中看到两次循环分别处理了 "user_A,beijing""user_B,shanghai"

附录:do-while 与 for-each 节点的对比

特性

do-while 节点

for-each 节点

核心逻辑

条件驱动:重复执行,直到某个条件不再满足。

数据驱动:为输入列表中的每一个元素执行一次。

循环次数

不确定,取决于条件何时达成。

确定,等于输入列表的元素个数。

执行保证

至少执行一次(即使初始条件已不满足)。

如果输入为空,则一次也不执行

适用场景

轮询、等待、状态检查、分批处理直到源为空。

批量处理已知列表(如同步一组表、处理多个分区)。

控制方式

通过 结束节点 输出 True / False 控制循环继续与否。

自动遍历完所有元素后自然结束。