与按照预定时间表(如每天凌晨1点)运行的周期工作流不同,触发式工作流是一种按需执行、事件驱动的数据处理模式。它的运行由外部信号(如文件上传、消息到达、API调用或手动点击)即时触发,为数据处理提供极高的实时性和灵活性。
特性 | 周期工作流 | 触发式工作流 |
驱动方式 | 固定时间(Crontab表达式) | 外部信号(事件、API、手动) |
执行模式 | 计划性、可预测 | 响应式、按需执行 |
适用场景 | T+1批量数仓、定时报表 | 文件到达即处理、与业务系统集成、手动数据修复 |
核心优势 | 稳定性、周期性保障 | 即时性、灵活性 |
支持的触发方式
触发式工作流支持以下三种触发方式,您可以根据业务场景灵活选择:
触发方式 | 触发方 | 核心场景 | 关键点 |
事件触发 | 外部事件源(如OSS、Kafka) | 事件驱动ETL:文件落地即处理、消息驱动实时计算。 | 需要先新建触发器并与工作流关联。仅在生产环境生效。 |
手动触发 | 用户(开发者/运维人员) | 临时任务:一次性数据处理或分析。 | 开发环境、生产环境均可手动运行。是手动业务流程的推荐替代方案。 |
API触发 | 外部系统(通过OpenAPI) | 系统集成:被业务系统(如CRM、ERP)回调,触发数据加工。 | 需要调用OpenAPI,并具备相应权限。 |
快速入门:创建手动触发工作流
本章节将引导您创建一个最简单的触发式工作流,并通过手动方式运行它,快速体验完整流程。
步骤一:创建触发式工作流
进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的,进入Data Studio。
在左侧导航栏单击
,然后在项目目录右侧单击,进入新建工作流页面。在弹出的对话框中,在新建工作流页面,选择调度类型为触发式调度。输入工作流名称,单击确认创建。
步骤二:编排工作流并开发节点
点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个 Shell 节点到画布中,输入名称完成创建。
双击Shell节点,进入代码编辑页面,输入以下代码:
echo "Hello, Trigger Workflow! Current time is ${bizdate}"单击工具栏的保存按钮。
步骤三:调试运行(开发环境)
返回工作流画布,单击顶部工具栏的
图标。在弹出的对话框中,填写工作流的本次运行值(例如今天20260310,
bizdate应该被替换为20260309)。稍后,在下方的运行日志中,您可以看到节点的运行状态和
echo命令的输出结果。
步骤四:发布并运行(生产环境)
在工作流画布上方,单击发布
按钮,根据提示完成发布流程。发布成功后,进入 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流。
找到您刚发布的工作流,单击操作列的运行。
在弹出的窗口中再次单击运行,即可在生产环境触发一次该工作流的实例。您可以在手动实例页面查看本次运行的详情。
至此,您已经成功掌握最基础的触发式工作流使用方法。接下来,我们将深入探索更强大的事件触发能力。
进阶案例:创建事件触发式工作流
场景一:OSS新文件到达,自动触发数据处理
目标:当有新的CSV文件上传到OSS指定目录时,自动触发一个工作流,打印出该文件的路径。
步骤一:创建OSS触发器
进入 运维中心 > 调度设置 > 触发器管理。
单击 新建触发器,并按如下配置:
说明详细参数介绍,请参见OSS触发器。
触发器名称:自定义,如
oss_new_file_trigger。适用工作空间:选择作用的目标工作空间,即工作流所在的工作空间。
触发事件类型:选择
对象存储OSS。触发事件:选择
oss:ObjectCreated:PutObject(或其他上传事件)。Bucket名称:选择您的OSS Bucket。
文件名称:指定监听的文件路径和格式,支持通配符。例如,监听
input/目录下所有.csv文件,可填写input/*.csv。角色配置:首次使用需进行一键授权,选择授权生成的名称为
DataWorks-EventBridge-OSS-MNS-Role-*************角色。*************表示随机生成一串的13位id数字,用于作唯一性区别。
单击确认,完成触发器创建。
步骤二:创建并关联工作流
按照快速入门:创建手动触发工作流的步骤,新建一个名为
process_oss_file_workflow的触发式工作流。在工作流画布的右侧面板,选择 调度配置 > 调度策略。
在触发器下拉框中,选择刚刚创建的
oss_new_file_trigger。
步骤三:开发节点,解析事件参数
点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个 Shell 节点到画布中,输入名称完成创建。
双击节点,编写代码以获取并打印触发事件中的文件路径。
# 当触发器触发工作流时,事件信息会通过内置变量 workflow.triggerMessage 传入 # 我们可以通过 ${workflow.triggerMessage.data.oss.object.key} 获取到上传文件的完整路径 echo "========= Start Processing OSS File =========" message="${workflow.triggerMessage}" echo "Raw Value: ${message}" # 从事件消息中提取文件名 FILE_PATH="${workflow.triggerMessage.data.oss.object.key}" echo "A new file has arrived: ${FILE_PATH}" # 此处可以写具体处理逻辑 echo "========= Finish Processing OSS File ========="说明${workflow.triggerMessage}:获取完整的JSON格式事件消息体。OSS具体的消息格式可在EventBridge的事件总线 >
DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name>> 事项追踪> 事项详情中获取。
步骤四:调试与发布
调试:
返回工作流画布,单击运行
按钮。在触发器消息体输入框中,粘贴一段模拟的OSS事件JSON。您可以从触发器配置页的“消息格式示例”复制并修改
key的值。以下为简单示例。{ "data": { "oss":{ "object": { "key": "input/test_file_20260310.csv" } } } }单击运行,检查日志中是否成功打印出
input/test_file_2026310.csv。
发布:调试通过后,单击发布按钮,将工作流发布到生产环境。事件触发只有在生产环境才会生效。
步骤五:生产验证
通过OSS控制台或客户端,向您在触发器中配置的Bucket和路径(例如
input/目录)上传一个CSV文件。进入DataWorks 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流,出现发布成功的
process_oss_file_workflow。
等待片刻,进入DataWorks 运维中心 > 手动任务运维 > 触发式工作流实例,一个新的工作流实例被自动触发运行。单击查看其日志,确认文件路径被正确处理。
最佳实践:幂等性设计
受网络波动等因素影响,OSS 事件可能被重复投递。为避免数据重复处理,建议在业务逻辑中实现幂等性。常见方案为处理文件前,先检查一个记录表(如MaxCompute表),以文件的ETag或唯一路径为标识,若已处理过则跳过。
场景二:Kafka消息到达,驱动实时计算
目标:监听Kafka中用户的行为日志,当有新消息时,触发工作流进行解析,并根据内容执行不同逻辑。
步骤一:创建Kafka触发器
进入 运维中心 > 调度设置 > 触发器管理,单击新建触发器。
配置如下:
触发器名称:
kafka_user_action_trigger。触发事件类型:选择 云消息队列Kafka版。
Kafka实例、Topic:选择您要监听的实例和Topic。
ConsumerGroupId:建议选择快速创建,系统会自动生成一个消费组ID,避免与其他应用冲突。
Key(可选):可指定消息的Key,只有Key完全匹配的消息才会触发工作流。
单击确认。
步骤二:创建并关联工作流
按照快速入门:创建手动触发工作流的步骤,新建一个名为
handle_user_action_workflow的触发式工作流。在工作流画布的右侧面板,选择调度配置 > 调度策略。
在 触发器下拉框中,选择刚创建的
kafka_user_action_trigger。
(重要) 考虑到消息可能高频到达,建议配置内部任务最大并行实例数,如
100,防止瞬间大量消息拖垮调度资源。
步骤三:开发节点,解析嵌套JSON
假设Kafka消息的value字段是一个JSON字符串,格式如下: {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}。
点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个Python节点到画布中。
编写代码解析消息。由于
value本身是字符串,我们需要在代码中进行二次JSON解析。import json # 1. 使用内置变量获取 Kafka 消息的 value 字段,它是一个JSON字符串 message_value_str = '${workflow.triggerMessage.value}' print(f'Received raw message value string: ${message_value_str}') try: # 2. 在Python代码中将这个字符串解析成JSON对象(字典) message_data = json.loads(message_value_str) user_id = message_data.get("user_id") action_type = message_data.get("action_type") print(f"Successfully parsed message. User ID: ${user_id}, Action: ${action_type}") # 3. 接下来可以根据 action_type 执行不同的业务逻辑 if action_type == 'login': # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');") print("Processing login action...") elif action_type == 'purchase': print("Processing purchase action...") else: print("Unknown action type.") except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # 异常处理逻辑,例如可以将错误消息写入一个专门的日志表 raise e # 抛出异常,使节点运行失败,便于排查
步骤四:调试与发布
调试:
返回工作流画布,单击运行
按钮。在 触发器消息体 中,粘贴模拟的Kafka事件。注意
value字段是一个转义后的JSON字符串。{ "topic": "user-behavior-topic", "key": "some-key", "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}" }运行并检查日志,确认Python节点能正确解析出
user_id和action_type。
发布:调试通过后,将工作流发布到生产环境。
步骤五:生产验证
向您配置的Kafka Topic发送一条符合格式的消息。

进入DataWorks 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流,出现发布成功的
handle_user_action_workflow。
在 运维中心 > 手动任务运维 > 手动实例 > 触发式工作流实例中,观察是否有新的工作流实例被触发,并检查其运行日志。

最佳实践:并发与顺序
并发控制:务必设置合理的最大并行实例数以应对消息洪峰。
顺序保证:DataWorks调度本身不保证消息的严格顺序处理。如果需要保证同一个用户(或分区)的消息按顺序执行,您需要在业务代码中自行实现分布式锁(如基于Redis/MaxCompute),或者将处理逻辑下沉到保证分区有序消费的计算引擎(如Flink)。
核心设计与配置
工作流编排
触发式工作流编排核心流程和周期工作流相似,可参考节点/工作流编排。
调度参数
在工作流画布右侧的调度配置面板中,在此为工作流设置全局参数,其内部所有节点均可引用。
引用方式:在节点代码中,通过
${workflow.参数名}格式引用工作流参数。参数优先级:DataWorks中的参数存在层级覆盖关系,生效优先级为:节点参数 > 工作流参数。
更多参数说明,请参见参数设计与流转。
调度策略
当多个工作流或任务同时被触发,导致系统资源出现瓶颈时,您可以通过优先级和加权策略实现智能化的资源调度,确保最重要的任务优先执行。
保障核心业务:为核心业务的工作流设置一个更高的优先级,使其总能优先于其他非核心工作流运行。
缩短关键流程耗时:在同一个工作流实例内部,通过优先级加权策略,您可以影响节点的执行顺序。例如,使用向下加权策略,可以让处于关键路径上、拥有更多上游依赖的节点获得更高的动态权重,从而优先执行,有效缩短整个工作流的运行时长。
配置项
功能说明
优先级
定义工作流实例在调度队列中的绝对优先级别。可选级别为1、3、5、7、8(数字越大,优先级越高)。高优先级的任务/工作流总会优先于低优先级的任务/工作流获取调度资源。
优先级加权策略
定义同一优先级下,工作流内部各节点(Task)权重的动态计算方式。权重越高的节点将优先获得执行机会。
不加权:所有节点的权重均为固定基准值。
向下加权:节点的权重会动态调整,其上游依赖的节点越多,权重越高。此策略有助于DAG(有向无环图)中关键路径上的节点优先执行。权重计算方式为:
权重初始值 + 其上游所有节点的优先级之和。
内部任务最大并行实例数
控制此工作流在同一时间可运行的最大实例数量,用于并发控制和资源保护。当运行中的实例数达到上限时,后续被触发的新实例将进入等待状态。支持设置不限制或自定义一个最大值(上限100,000)。
说明设置上限时若超过资源组最大可承受能力,则实际的并发瓶颈将由资源组的物理上限决定。
DataWorks的优先级系统遵循层级覆盖规则:运行时指定 > 节点级配置 > 工作流级配置。
工作流级配置 (基准):在工作流的调度策略中配置,作为所有节点的默认设置。
节点级配置 (局部):在工作流内部单个节点的调度配置 > 调度策略中为特定节点单独设置更高优先级,会覆盖工作流级别的设置。
运行时指定 (临时):在运维中心手动触发运行时,通过运行时重置优先级开关指定的配置。该配置优先级最高,仅对当次运行生效,不修改任何永久配置。
运维与管理
使用限制与注意事项
生效环境:事件触发机制仅在工作流发布到生产环境(运维中心)后生效。
节点数量:单个工作流最多支持400个节点,建议控制在100个以内以简化维护。
并发上限:最大并行实例数上限为100,000,但实际并发能力受您购买的调度资源组规格限制。
节点级调度:在节点级别配置调度时,仅支持配置优先级,不支持配置优先级加权策略。





