当需要将上游节点的查询或输出结果传递给下游节点时,可通过赋值节点实现。赋值节点(即上游节点)支持 MaxCompute SQL、Python 2和Shell三种语言,会自动将最后一条查询或输出结果赋值给节点的输出参数(outputs),下游节点可以通过引用该参数来获取赋值节点的输出结果。
适用范围
版本限制:仅支持DataWorks标准版及以上版本。
权限限制:RAM账号已被添加至对应工作空间中,并具有开发或空间管理员角色权限。详情请参见为工作空间添加空间成员。
核心概念:参数的传递与引用
赋值节点的核心是参数传递,负责将上游节点产生的数据传递给下游节点。
上游赋值节点:负责产生数据。它会将最后一条输出或查询结果自动赋值给一个名为
outputs的节点输出参数。下游业务节点:负责接收和使用数据。通过在节点配置中添加一个节点输入参数(例如,
param),并让它引用上游节点的outputs参数,就可以在代码中使用这些数据。
传递参数格式说明
赋值节点的传递参数格式说明如下:
赋值语言 | 传递参数范围 | 传递参数格式 |
MaxCompute SQL | 获取最后一行 | 将输出结果作为一个二维数组传递至下游。 |
Python 2 | 获取最后一行 | 将输出结果转化成字符串后基于逗号 例如,赋值节点最后一行输出为 重要 若输出内容本身包含逗号,需进行转义处理。例如输出 |
Shell | 获取最后一行 |
操作步骤
下面以将赋值节点的结果传递给Shell节点为例,介绍通用的操作流程。 实际场景中,支持任意节点作为下游节点。
配置上游赋值节点
在目标工作流中,创建并编辑一个赋值节点,按需选择MaxCompute SQL、Python 2和Shell,并编写代码以产生需要传递给下游的结果。

配置下游Shell节点
创建Shell节点,在Shell节点的编辑页面,引用上游结果:

在右侧调度配置中,选择节点上下文参数页签。
在节点输入参数区域,单击添加参数。
在弹出的对话框中,选择上游节点的输出参数为上一步赋值节点的
outputs,并为当前节点的输入参数自定义一个参数名称(例如:param)。说明配置完成后,下游节点会自动与上游赋值节点建立依赖关系。
完成参数配置后,即可在下游Shell节点的代码中通过
${param}的格式来使用上游传递过来的值。
运行验证
返回工作流,单击工具栏上方的发布,选择全量发布。
前往运维中心模块的,对目标工作流进行冒烟测试。
在测试实例中,查看最终运行结果是否符合预期。
注意事项
传递层级:赋值节点参数只能传递给直接下游的一层子节点,不支持跨层级的节点传递。
传递大小限制:传递值最大为2MB。如果赋值语句的输出结果超过该限制,赋值节点会运行失败。
语法限制:
赋值节点代码中不支持添加注释,否则可能导致运行结果异常。
MaxCompute SQL模式下暂不支持WITH语法。
使用示例:分语言详解
不同语言的赋值节点,其输出结果(outputs)的数据格式和下游节点的引用方式略有不同。下面以Shell为下游节点,分别举例说明。
示例一:传递 MaxCompute SQL 查询结果
SQL的查询结果会作为一个二维数组传递给下游。
上游节点(赋值节点-SQL)配置
假设SQL代码如下,查询返回两行两列的数据:
SELECT 'beijing', '1001' UNION ALL SELECT 'hangzhou', '1002';下游节点(Shell节点)配置和输出
在Shell节点中添加名为
region的输入参数,并引用上游SQL节点的outputs。编写如下代码读取数据:
echo "整个结果集: ${region}" echo "第一行: ${region[0]}" echo "第一行第二个字段: ${region[0][1]}"DataWorks将直接解析参数,并做静态替换。运行输出如下:
整个结果集: beijing,1001 hangzhou,1002 第一行: beijing,1001 第一行第二个字段: 1001
示例二:传递 Python 2 输出结果
Python 2的print语句输出结果会基于逗号,分割,并作为一个一维数组传递给下游。
上游节点(赋值节点-Python 2)配置
Python 2代码如下:
print 'Electronics, Clothing, Books';下游节点(Shell节点)配置和输出
在Shell节点中添加名为
types的输入参数,并引用上游赋值节点的outputs。编写如下代码读取数据:
# 直接输出整个一维数组 echo "整个结果集: ${types}" # 按索引输出数组中的元素 echo "第二个元素: ${types[1]}"DataWorks将直接解析参数,并做静态替换。运行输出如下:
整个结果集: Electronics,Clothing,Books 第二个元素: Clothing
Shell节点处理逻辑和Python 2类似,不再重复说明。
场景实践:批量处理多业务线分区表数据
本示例展示如何使用赋值节点和for-each 节点批量处理多个业务线的用户行为数据,实现一套处理逻辑服务多条产品线的自动化数据处理。
业务背景
假设您是一家综合性互联网公司的数据开发工程师,负责处理三个核心业务线的数据:电商(ecom)、金融(finance)和物流(logistics),且后续存在增加业务线的可能。您需要每天对这三个业务线的用户行为日志执行相同的聚合逻辑,计算每个用户的日活跃度(PV),并将结果存入统一的汇总表。
上游源表(DWD层):
dwd_user_behavior_ecom_d:电商用户行为表。dwd_user_behavior_finance_d:金融用户行为表。dwd_user_behavior_logistics_d:物流用户行为表。dwd_user_behavior_${业务线}_d:后续更多可能的业务线用户行为表。这些表结构相同,都按天分区(
dt)。
下游目标表(DWS):
dws_user_summary_d:用户汇总表。该表按业务线(
biz_line)和天(dt)双重分区,用于统一存储所有业务线的聚合结果。
若为每个业务线创建独立的任务,维护成本高且容易出错。使用 for-each 节点后,只需维护一份处理逻辑,系统会自动遍历所有业务线完成计算。
数据准备
首先,创建示例表并插入测试数据(以业务日期 20251010 为例):
为工作空间绑定MaxCompute计算资源。
进入Data Studio数据开发,创建MaxCompute SQL节点。
创建源表(DWD层):在MaxCompute SQL节点添加如下代码,并选中运行。
-- 电商用户行为表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT '用户ID', action_type STRING COMMENT '行为类型', event_time BIGINT COMMENT '事件发生的毫秒级Unix时间戳' ) COMMENT '电商用户行为日志明细表' PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- 验证电商用户行为表创建成功 SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- 金融用户行为表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT '用户ID', action_type STRING COMMENT '行为类型', event_time BIGINT COMMENT '事件发生的毫秒级Unix时间戳' ) COMMENT '金融用户行为日志明细表' PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- 验证金融用户行为表创建成功 SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- 物流用户行为表 CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT '用户ID', action_type STRING COMMENT '行为类型', event_time BIGINT COMMENT '事件发生的毫秒级Unix时间戳' ) COMMENT '物流用户行为日志明细表' PARTITIONED BY (dt STRING COMMENT '日期分区,格式 yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- 验证物流用户行为表创建成功 SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';创建目标表(DWS层):在MaxCompute SQL节点上添加如下代码,并选中运行。
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT '用户ID', pv BIGINT COMMENT '日活跃度', ) COMMENT '用户日活跃度汇总表' PARTITIONED BY ( dt STRING COMMENT '日期分区,格式 yyyymmdd', biz_line STRING COMMENT '业务线分区,如 ecom, finance, logistics' );重要若工作空间使用标准环境,需要将此节点发布至生产环境,并执行补数据。
工作流实现
创建一个工作流,并在右侧调度参数设置调度参数bizdate为上一天
$[yyyymmdd-1]。
在工作流中,创建名为get_biz_list的赋值节点,并使用MaxCompute SQL语言编写如下代码。该节点输出需要处理的业务线列表:
-- 输出所有需要处理的业务线 SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line;配置 for-each 节点
回到工作流界面,给赋值节点get_biz_list创建一个下游的for-each节点。
进入for-each节点设置界面,在右侧调度配置的,将 loopDataArray 参数绑定到get_biz_list节点的outputs。

在for-each节点循环体内单击新建内部节点,创建 MaxCompute SQL节点,编写循环体内的处理逻辑。
说明该脚本由 for-each 节点驱动,会针对每个业务线执行一次。
内置变量 ${dag.foreach.current} 在每次运行时,会动态地替换为当前的业务线名称。预期的迭代值为:'ecom', 'finance', 'logistics'。
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id;
添加验证节点
回到工作流,在for-each节点单击新建下游,创建一个MaxCompute SQL节点并添加如下代码。
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
发布和运行结果
将工作流发布至生产环境,前往运维中心的,找到目标工作流并执行冒烟测试,选择业务日期为'20251010'。
完成运行后,在测试实例中查看运行日志。最终节点预期输出为:
user_id | pv | dt | biz_line |
user001 | 2 | 20251010 | ecom |
user002 | 1 | 20251010 | ecom |
user003 | 3 | 20251010 | finance |
user004 | 1 | 20251010 | finance |
user001 | 1 | 20251010 | logistics |
user005 | 1 | 20251010 | logistics |
方案优势
高扩展性:新增业务线时,只需在赋值节点添加一行 SQL,无需修改处理逻辑。
易维护:所有业务线共享同一套处理逻辑,修改一处即可生效于全部。
常见问题
Q:在MaxCompute SQL语言下,报错“find no select sql in sql assignment!”。
A:MaxCompute SQL缺少
SELECT语句,请添加SELECT语句。暂不支持WITH语句,若使用WITH语句,也会报此错误。Q:在Shell、Python语言下,报错“OutPut Result is null, cannot handle!”。
A:缺少输出,请检查代码中是否存在打印语句(
print、echo)。Q:在Shell、Python语言下,输出元素本身带有逗号,如何处理?
A :需要对逗号
,进行转义,处理成\,。以Python为例,处理代码如下。categories = ["Electronics", "Clothing, Shoes & Accessories"] # 对每个元素中包含的逗号进行转义 # 将 ',' 替换为 '\,' escaped_categories = [cat.replace(",", "\,") for cat in categories] # 使用逗号连接转义后的元素 output_string = ",".join(escaped_categories) print output_string # 最终输出到下游的字符串是: # Electronics,Clothing\, Shoes & AccessoriesQ:下游节点支持上游配置多个赋值节点来接收多个结果么?
A :支持,只需给不同节点结果赋值不同的参数即可。

Q:赋值节点支持其他语言类型么?
A:赋值节点目前仅支持MaxCompute SQL、Python 2和Shell三种语言,部分节点(如EMR Hive、Hologres SQL、EMR Spark SQL、AnalyticDB for PostgreSQL、ClickHouse SQL和MySQL节点等)自身支持赋值参数功能,可实现与赋值节点相同的效果。

相关文档
若下游需循环遍历处理,请参见for-each节点、do-while节点配合使用。
若需要跨层级传递参数,请参见参数节点配合使用。
参数的传递配置,更多请参见节点上下文参数。