赋值节点

当需要将上游节点的查询或输出结果传递给下游节点时,可通过赋值节点实现。赋值节点(即上游节点)支持 MaxCompute SQLPython 2Shell三种语言,会自动将最后一条查询或输出结果赋值给节点的输出参数(outputs),下游节点可以通过引用该参数来获取赋值节点的输出结果。

适用范围

  • 版本限制:仅支持DataWorks标准版及以上版本。

  • 权限限制:RAM账号已被添加至对应工作空间中,并具有开发空间管理员角色权限。详情请参见为工作空间添加空间成员

核心概念:参数的传递与引用

赋值节点的核心是参数传递,负责将上游节点产生的数据传递给下游节点。

  • 上游赋值节点:负责产生数据。它会将最后一条输出或查询结果自动赋值给一个名为outputs节点输出参数

  • 下游业务节点:负责接收和使用数据。通过在节点配置中添加一个节点输入参数(例如,param),并让它引用上游节点的outputs参数,就可以在代码中使用这些数据。

    image

传递参数格式说明

赋值节点的传递参数格式说明如下:

赋值语言

传递参数范围

传递参数格式

MaxCompute SQL

获取最后一行 SELECT 语句的输出。

将输出结果作为一个二维数组传递至下游。

Python 2

获取最后一行 print 语句的输出。

将输出结果转化成字符串后基于逗号,分割为一维数组

例如,赋值节点最后一行输出为'Electronics,Clothing,Books',传递给下游的格式为['Electronics','Clothing','Books']
重要

若输出内容本身包含逗号,需进行转义处理。例如输出 'Electronics,Clothing\, Shoes & Accessories',下游将正确解析为 ['Electronics', 'Clothing, Shoes & Accessories']

Shell

获取最后一行 echo 语句的输出。

操作步骤

下面以将赋值节点的结果传递给Shell节点为例,介绍通用的操作流程。 实际场景中,支持任意节点作为下游节点。

  1. 配置上游赋值节点

    在目标工作流中,创建并编辑一个赋值节点,按需选择MaxCompute SQLPython 2Shell,并编写代码以产生需要传递给下游的结果。

    image

  2. 配置下游Shell节点

    创建Shell节点,在Shell节点的编辑页面,引用上游结果:

    image

    1. 在右侧调度配置中,选择节点上下文参数页签。

    2. 节点输入参数区域,单击添加参数

    3. 在弹出的对话框中,选择上游节点的输出参数为上一步赋值节点的outputs,并为当前节点的输入参数自定义一个参数名称(例如:param)。

      说明

      配置完成后,下游节点会自动与上游赋值节点建立依赖关系。

    4. 完成参数配置后,即可在下游Shell节点的代码中通过${param}的格式来使用上游传递过来的值。

  3. 运行验证

    1. 返回工作流,单击工具栏上方的发布,选择全量发布。

    2. 前往运维中心模块的任务运维 > 周期任务运维 > 周期任务,对目标工作流进行冒烟测试

    3. 在测试实例中,查看最终运行结果是否符合预期。

注意事项

  • 传递层级:赋值节点参数只能传递给直接下游的一层子节点,不支持跨层级的节点传递。

  • 传递大小限制:传递值最大为2MB。如果赋值语句的输出结果超过该限制,赋值节点会运行失败。

  • 语法限制

    • 赋值节点代码中不支持添加注释,否则可能导致运行结果异常。

    • MaxCompute SQL模式下暂不支持WITH语法

使用示例:分语言详解

不同语言的赋值节点,其输出结果(outputs)的数据格式和下游节点的引用方式略有不同。下面以Shell为下游节点,分别举例说明。

示例一:传递 MaxCompute SQL 查询结果

SQL的查询结果会作为一个二维数组传递给下游。

  • 上游节点(赋值节点-SQL)配置

    假设SQL代码如下,查询返回两行两列的数据:

    SELECT 'beijing', '1001'
    UNION ALL 
    SELECT 'hangzhou', '1002';
  • 下游节点(Shell节点)配置和输出

    Shell节点中添加名为region的输入参数,并引用上游SQL节点的outputs

    编写如下代码读取数据:

    echo "整个结果集: ${region}"
    echo "第一行: ${region[0]}"
    echo "第一行第二个字段: ${region[0][1]}"

    DataWorks将直接解析参数,并做静态替换。运行输出如下:

    整个结果集: beijing,1001
    hangzhou,1002
    第一行: beijing,1001
    第一行第二个字段: 1001

示例二:传递 Python 2 输出结果

Python 2print语句输出结果会基于逗号,分割,并作为一个一维数组传递给下游。

  • 上游节点(赋值节点-Python 2)配置

    Python 2代码如下:

    print 'Electronics, Clothing, Books';
  • 下游节点(Shell节点)配置和输出

    Shell节点中添加名为types的输入参数,并引用上游赋值节点的outputs

    编写如下代码读取数据:

    # 直接输出整个一维数组
    echo "整个结果集: ${types}"
    
    # 按索引输出数组中的元素
    echo "第二个元素: ${types[1]}"

    DataWorks将直接解析参数,并做静态替换。运行输出如下:

    整个结果集: Electronics,Clothing,Books
    第二个元素: Clothing
说明

Shell节点处理逻辑和Python 2类似,不再重复说明。

场景实践:批量处理多业务线分区表数据

本示例展示如何使用赋值节点for-each 节点批量处理多个业务线的用户行为数据,实现一套处理逻辑服务多条产品线的自动化数据处理。

image

业务背景

假设您是一家综合性互联网公司的数据开发工程师,负责处理三个核心业务线的数据:电商(ecom)、金融(finance)和物流(logistics),且后续存在增加业务线的可能。您需要每天对这三个业务线的用户行为日志执行相同的聚合逻辑,计算每个用户的日活跃度(PV),并将结果存入统一的汇总表。

  • 上游源表(DWD层):

    • dwd_user_behavior_ecom_d:电商用户行为表。

    • dwd_user_behavior_finance_d :金融用户行为表。

    • dwd_user_behavior_logistics_d :物流用户行为表。

    • dwd_user_behavior_${业务线}_d :后续更多可能的业务线用户行为表。

    • 这些表结构相同,都按天分区(dt)。

  • 下游目标表(DWS):

    • dws_user_summary_d :用户汇总表。

    • 该表按业务线(biz_line)和天(dt)双重分区,用于统一存储所有业务线的聚合结果。

若为每个业务线创建独立的任务,维护成本高且容易出错。使用 for-each 节点后,只需维护一份处理逻辑,系统会自动遍历所有业务线完成计算。

数据准备

首先,创建示例表并插入测试数据(以业务日期 20251010 为例):

  1. 为工作空间绑定MaxCompute计算资源

  2. 进入Data Studio数据开发,创建MaxCompute SQL节点。

  3. 创建源表(DWD层):在MaxCompute SQL节点添加如下代码,并选中运行。

    -- 电商用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '电商用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- 验证电商用户行为表创建成功
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- 金融用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '金融用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- 验证金融用户行为表创建成功
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- 物流用户行为表
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT '用户ID',
        action_type STRING COMMENT '行为类型',
        event_time  BIGINT COMMENT '事件发生的毫秒级Unix时间戳'
    ) 
    COMMENT '物流用户行为日志明细表'
    PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- 验证物流用户行为表创建成功
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. 创建目标表(DWS层):在MaxCompute SQL节点上添加如下代码,并选中运行。

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT '用户ID',
        pv          BIGINT COMMENT '日活跃度',
    ) 
    COMMENT '用户日活跃度汇总表'
    PARTITIONED BY (
        dt           STRING COMMENT '日期分区,格式 yyyymmdd',
        biz_line     STRING COMMENT '业务线分区,如 ecom, finance, logistics'
    );
    重要

    若工作空间使用标准环境,需要将此节点发布至生产环境,并执行补数据。

工作流实现

  1. 创建一个工作流,并在右侧调度参数设置调度参数bizdate为上一天$[yyyymmdd-1]

    image

  2. 在工作流中,创建名为get_biz_list的赋值节点,并使用MaxCompute SQL语言编写如下代码。该节点输出需要处理的业务线列表:

    -- 输出所有需要处理的业务线
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. 配置 for-each 节点

    • 回到工作流界面,给赋值节点get_biz_list创建一个下游的for-each节点。

    • 进入for-each节点设置界面,在右侧调度配置调度参数 > 脚本参数,将 loopDataArray 参数绑定到get_biz_list节点的outputs

      image

    • for-each节点循环体内单击新建内部节点,创建 MaxCompute SQL节点,编写循环体内的处理逻辑。

      说明
      • 该脚本由 for-each 节点驱动,会针对每个业务线执行一次。

      • 内置变量 ${dag.foreach.current} 在每次运行时,会动态地替换为当前的业务线名称。预期的迭代值为:'ecom', 'finance', 'logistics'。

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. 添加验证节点

    回到工作流,在for-each节点单击新建下游,创建一个MaxCompute SQL节点并添加如下代码。

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

发布和运行结果

将工作流发布至生产环境,前往运维中心的周期任务运维 > 周期任务,找到目标工作流并执行冒烟测试,选择业务日期为'20251010'

完成运行后,在测试实例中查看运行日志。最终节点预期输出为:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

方案优势

  • 高扩展性:新增业务线时,只需在赋值节点添加一行 SQL,无需修改处理逻辑。

  • 易维护:所有业务线共享同一套处理逻辑,修改一处即可生效于全部。

常见问题

  • Q:在MaxCompute SQL语言下,报错“find no select sql in sql assignment!”。

    A:MaxCompute SQL缺少SELECT语句,请添加SELECT语句。暂不支持WITH语句,若使用WITH语句,也会报此错误。

  • Q:在Shell、Python语言下,报错“OutPut Result is null, cannot handle!”。

    A:缺少输出,请检查代码中是否存在打印语句(printecho)。

  • Q:在Shell、Python语言下,输出元素本身带有逗号,如何处理?

    A :需要对逗号,进行转义,处理成\,。以Python为例,处理代码如下。

    categories = ["Electronics", "Clothing, Shoes & Accessories"]
    
    # 对每个元素中包含的逗号进行转义
    # 将 ',' 替换为 '\,'
    escaped_categories = [cat.replace(",", "\,") for cat in categories]
    
    # 使用逗号连接转义后的元素
    output_string = ",".join(escaped_categories)
    print output_string
    # 最终输出到下游的字符串是:
    # Electronics,Clothing\, Shoes & Accessories
  • Q:下游节点支持上游配置多个赋值节点来接收多个结果么?

    A :支持,只需给不同节点结果赋值不同的参数即可。

    image

  • Q:赋值节点支持其他语言类型么?

    A:赋值节点目前仅支持MaxCompute SQLPython 2Shell三种语言,部分节点(如EMR HiveHologres SQLEMR Spark SQLAnalyticDB for PostgreSQLClickHouse SQLMySQL节点等)自身支持赋值参数功能,可实现与赋值节点相同的效果。

    image

相关文档