触发式工作流

更新时间:
复制为 MD 格式

与按照预定时间表(如每天凌晨1点)运行的周期工作流不同,触发式工作流是一种按需执行、事件驱动的数据处理模式。它的运行由外部信号(如文件上传、消息到达、API调用或手动点击)即时触发,为数据处理提供极高的实时性和灵活性。

特性

周期工作流

触发式工作流

驱动方式

固定时间(Crontab表达式)

外部信号(事件、API、手动)

执行模式

计划性、可预测

响应式、按需执行

适用场景

T+1批量数仓、定时报表

文件到达即处理、与业务系统集成、手动数据修复

核心优势

稳定性、周期性保障

即时性、灵活性

支持的触发方式

触发式工作流支持以下三种触发方式,您可以根据业务场景灵活选择:

触发方式

触发方

核心场景

关键点

事件触发

外部事件源(如OSS、Kafka)

事件驱动ETL:文件落地即处理、消息驱动实时计算。

需要先新建触发器并与工作流关联。仅在生产环境生效

手动触发

用户(开发者/运维人员)

临时任务:一次性数据处理或分析。

开发环境、生产环境均可手动运行。是手动业务流程的推荐替代方案。

API触发

外部系统(通过OpenAPI)

系统集成:被业务系统(如CRM、ERP)回调,触发数据加工。

需要调用OpenAPI,并具备相应权限。

快速入门:创建手动触发工作流

本章节将引导您创建一个最简单的触发式工作流,并通过手动方式运行它,快速体验完整流程。

步骤一:创建触发式工作流

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在左侧导航栏单击image,然后在项目目录右侧单击image > 新建工作流,进入新建工作流页面。

  3. 在弹出的对话框中,在新建工作流页面,选择调度类型触发式调度。输入工作流名称,单击确认创建。

步骤二:编排工作流并开发节点

  1. 点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个 Shell 节点到画布中,输入名称完成创建。

  2. 双击Shell节点,进入代码编辑页面,输入以下代码:

    echo "Hello, Trigger Workflow! Current time is ${bizdate}"
  3. 单击工具栏的保存按钮。

步骤三:调试运行(开发环境)

  1. 返回工作流画布,单击顶部工具栏的image图标。

  2. 在弹出的对话框中,填写工作流的本次运行值(例如今天20260310,bizdate应该被替换为20260309)。

  3. 稍后,在下方的运行日志中,您可以看到节点的运行状态和 echo 命令的输出结果。

步骤四:发布并运行(生产环境)

  1. 在工作流画布上方,单击发布image按钮,根据提示完成发布流程。

  2. 发布成功后,进入 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流

  3. 找到您刚发布的工作流,单击操作列的运行

  4. 在弹出的窗口中再次单击运行,即可在生产环境触发一次该工作流的实例。您可以在手动实例页面查看本次运行的详情。

至此,您已经成功掌握最基础的触发式工作流使用方法。接下来,我们将深入探索更强大的事件触发能力。

进阶案例:创建事件触发式工作流

场景一:OSS新文件到达,自动触发数据处理

目标:当有新的CSV文件上传到OSS指定目录时,自动触发一个工作流,打印出该文件的路径。

步骤一:创建OSS触发器

  1. 进入 运维中心 > 调度设置 > 触发器管理

  2. 单击 新建触发器,并按如下配置:

    说明

    详细参数介绍,请参见OSS触发器

    • 触发器名称:自定义,如 oss_new_file_trigger

    • 适用工作空间:选择作用的目标工作空间,即工作流所在的工作空间。

    • 触发事件类型:选择 对象存储OSS

    • 触发事件:选择 oss:ObjectCreated:PutObject(或其他上传事件)。

    • Bucket名称:选择您的OSS Bucket。

    • 文件名称:指定监听的文件路径和格式,支持通配符。例如,监听 input/ 目录下所有.csv文件,可填写 input/*.csv

    • 角色配置:首次使用需进行一键授权,选择授权生成的名称为DataWorks-EventBridge-OSS-MNS-Role-*************角色。

      *************表示随机生成一串的13id数字,用于作唯一性区别。
  3. 单击确认,完成触发器创建。

步骤二:创建并关联工作流

  1. 按照快速入门:创建手动触发工作流的步骤,新建一个名为 process_oss_file_workflow触发式工作流

  2. 在工作流画布的右侧面板,选择 调度配置 > 调度策略

  3. 触发器下拉框中,选择刚刚创建的 oss_new_file_trigger

    image

步骤三:开发节点,解析事件参数

  1. 点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个 Shell 节点到画布中,输入名称完成创建。

  2. 双击节点,编写代码以获取并打印触发事件中的文件路径。

    # 当触发器触发工作流时,事件信息会通过内置变量 workflow.triggerMessage 传入
    # 我们可以通过 ${workflow.triggerMessage.data.oss.object.key} 获取到上传文件的完整路径
    
    echo "========= Start Processing OSS File ========="
    message="${workflow.triggerMessage}"
    echo "Raw Value: ${message}"
    
    # 从事件消息中提取文件名
    FILE_PATH="${workflow.triggerMessage.data.oss.object.key}"
    echo "A new file has arrived: ${FILE_PATH}"
    
    # 此处可以写具体处理逻辑
    
    echo "========= Finish Processing OSS File ========="
    说明

    ${workflow.triggerMessage}:获取完整的JSON格式事件消息体。OSS具体的消息格式可在EventBridge事件总线 > DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name> > 事项追踪> 事项详情中获取。

    查看示例OSS消息格式,即workflow.triggerMessage的格式。

    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "1***********9",
        "data": {
            "eventVersion": "1.0",
            "responseElements": {
                "requestId": "69B1***********C0A8"
            },
            "eventSource": "acs:oss",
            "eventTime": "2026-03-11T05:40:45.000Z",
            "requestParameters": {
                "sourceIPAddress": "***********"
            },
            "eventName": "ObjectCreated:PostObject",
            "userIdentity": {
                "principalId": "1***********9"
            },
            "region": "cn-hangzhou",
            "oss": {
                "bucket": {
                    "name": "******",
                    "arn": "acs:oss:cn-hangzhou:1***********9:******",
                    "virtualBucket": "",
                    "ownerIdentity": "1***********9"
                },
                "ossSchemaVersion": "1.0",
                "object": {
                    "size": 59537,
                    "objectMeta": {
                        "mimeType": "text/csv"
                    },
                    "deltaSize": 0,
                    "eTag": "63***********D32",
                    "key": "input/***********.csv"
                }
            }
        },
        "subject": "acs:oss:cn-hangzhou:1***********9:dwoss1024/input/******.csv",
        "aliyunoriginalaccountid": "1***********9",
        "source": "acs.oss",
        "type": "oss:ObjectCreated:PostObject",
        "aliyunpublishtime": "2026-03-11T05:40:45.682Z",
        "specversion": "1.0",
        "aliyuneventbusname": "DATAWORKS_TRIGGER_FOR_BUCKET_******",
        "id": "69B1***********0A8",
        "time": "2026-03-11T05:40:45.000Z",
        "aliyunregionid": "cn-hangzhou"
    }

步骤四:调试与发布

  1. 调试

    • 返回工作流画布,单击运行image按钮。

    • 触发器消息体输入框中,粘贴一段模拟的OSS事件JSON。您可以从触发器配置页的“消息格式示例”复制并修改 key 的值。以下为简单示例。

      {
        "data": {
          "oss":{
            "object": {
              "key": "input/test_file_20260310.csv" 
            }
          } 
        }
      }
    • 单击运行,检查日志中是否成功打印出 input/test_file_2026310.csv

  2. 发布:调试通过后,单击发布按钮,将工作流发布到生产环境。事件触发只有在生产环境才会生效。

步骤五:生产验证

  1. 通过OSS控制台或客户端,向您在触发器中配置的Bucket和路径(例如 input/ 目录)上传一个CSV文件。

    如何确保事件触发成功?

    进入https://eventbridge.console.aliyun.com/<regionId>/event-bus/DATAWORKS_TRIGGER_FOR_BUCKET_<OssBucketName>/event-tracing,查询最近触发事件列表即可,同时可点击具体事项详情查看触发具体消息(即workflow.triggerMessage)。

    image

  2. 进入DataWorks 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流,出现发布成功的process_oss_file_workflow

    image

  3. 等待片刻,进入DataWorks 运维中心 > 手动任务运维 > 触发式工作流实例,一个新的工作流实例被自动触发运行。单击查看其日志,确认文件路径被正确处理。

重要

最佳实践:幂等性设计

受网络波动等因素影响,OSS 事件可能被重复投递。为避免数据重复处理,建议在业务逻辑中实现幂等性。常见方案为处理文件前,先检查一个记录表(如MaxCompute表),以文件的ETag或唯一路径为标识,若已处理过则跳过。

场景二:Kafka消息到达,驱动实时计算

目标:监听Kafka中用户的行为日志,当有新消息时,触发工作流进行解析,并根据内容执行不同逻辑。

步骤一:创建Kafka触发器

  1. 进入 运维中心 > 调度设置 > 触发器管理,单击新建触发器

  2. 配置如下:

    • 触发器名称kafka_user_action_trigger

    • 触发事件类型:选择 云消息队列Kafka

    • Kafka实例Topic:选择您要监听的实例和Topic。

    • ConsumerGroupId:建议选择快速创建,系统会自动生成一个消费组ID,避免与其他应用冲突。

    • Key(可选):可指定消息的Key,只有Key完全匹配的消息才会触发工作流。

  3. 单击确认

步骤二:创建并关联工作流

  1. 按照快速入门:创建手动触发工作流的步骤,新建一个名为 handle_user_action_workflow触发式工作流

  2. 在工作流画布的右侧面板,选择调度配置 > 调度策略

  3. 触发器下拉框中,选择刚创建的 kafka_user_action_trigger

    image

  4. (重要) 考虑到消息可能高频到达,建议配置内部任务最大并行实例数,如 100,防止瞬间大量消息拖垮调度资源。

步骤三:开发节点,解析嵌套JSON

假设Kafka消息的value字段是一个JSON字符串,格式如下: {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}

  1. 点击工具栏上方的+ 添加节点,打开节点列表。从左侧节点类型列表中,拖拽一个Python节点到画布中。

  2. 编写代码解析消息。由于value本身是字符串,我们需要在代码中进行二次JSON解析。

    import json
    
    # 1. 使用内置变量获取 Kafka 消息的 value 字段,它是一个JSON字符串
    message_value_str = '${workflow.triggerMessage.value}'
    
    print(f'Received raw message value string: ${message_value_str}')
    
    try:
        # 2. 在Python代码中将这个字符串解析成JSON对象(字典)
        message_data = json.loads(message_value_str)
        
        user_id = message_data.get("user_id")
        action_type = message_data.get("action_type")
        print(f"Successfully parsed message. User ID: ${user_id}, Action: ${action_type}")
        
        # 3. 接下来可以根据 action_type 执行不同的业务逻辑
        if action_type == 'login':
            # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');")
            print("Processing login action...")
        elif action_type == 'purchase':
            print("Processing purchase action...")
        else:
            print("Unknown action type.")
            
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        # 异常处理逻辑,例如可以将错误消息写入一个专门的日志表
        raise e # 抛出异常,使节点运行失败,便于排查

步骤四:调试与发布

  1. 调试

    • 返回工作流画布,单击运行image按钮。

    • 触发器消息体 中,粘贴模拟的Kafka事件。注意 value 字段是一个转义后的JSON字符串。

      {
        "topic": "user-behavior-topic",
        "key": "some-key",
        "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}"
      }
    • 运行并检查日志,确认Python节点能正确解析出user_idaction_type

  2. 发布:调试通过后,将工作流发布到生产环境。

步骤五:生产验证

  1. 向您配置的Kafka Topic发送一条符合格式的消息。

    image

  2. 进入DataWorks 运维中心 > 手动任务运维 > 手动任务 > 触发式工作流,出现发布成功的handle_user_action_workflow

    image

  3. 运维中心 > 手动任务运维 > 手动实例 > 触发式工作流实例中,观察是否有新的工作流实例被触发,并检查其运行日志。

    image

重要

最佳实践:并发与顺序

  • 并发控制:务必设置合理的最大并行实例数以应对消息洪峰。

  • 顺序保证:DataWorks调度本身不保证消息的严格顺序处理。如果需要保证同一个用户(或分区)的消息按顺序执行,您需要在业务代码中自行实现分布式锁(如基于Redis/MaxCompute),或者将处理逻辑下沉到保证分区有序消费的计算引擎(如Flink)。

核心设计与配置

工作流编排

触发式工作流编排核心流程和周期工作流相似,可参考节点/工作流编排

调度参数

在工作流画布右侧的调度配置面板中,在此为工作流设置全局参数,其内部所有节点均可引用。

  • 引用方式:在节点代码中,通过 ${workflow.参数名} 格式引用工作流参数。

  • 参数优先级:DataWorks中的参数存在层级覆盖关系,生效优先级为:节点参数 > 工作流参数

    更多参数说明,请参见参数设计与流转

调度策略

当多个工作流或任务同时被触发,导致系统资源出现瓶颈时,您可以通过优先级加权策略实现智能化的资源调度,确保最重要的任务优先执行。

  • 保障核心业务:为核心业务的工作流设置一个更高的优先级,使其总能优先于其他非核心工作流运行。

  • 缩短关键流程耗时:在同一个工作流实例内部,通过优先级加权策略,您可以影响节点的执行顺序。例如,使用向下加权策略,可以让处于关键路径上、拥有更多上游依赖的节点获得更高的动态权重,从而优先执行,有效缩短整个工作流的运行时长。

    配置项

    功能说明

    优先级

    定义工作流实例在调度队列中的绝对优先级别。可选级别为1、3、5、7、8(数字越大,优先级越高)。高优先级的任务/工作流总会优先于低优先级的任务/工作流获取调度资源。

    优先级加权策略

    定义同一优先级下,工作流内部各节点(Task)权重的动态计算方式。权重越高的节点将优先获得执行机会。

    • 不加权:所有节点的权重均为固定基准值。

    • 向下加权:节点的权重会动态调整,其上游依赖的节点越多,权重越高。此策略有助于DAG(有向无环图)中关键路径上的节点优先执行。权重计算方式为:权重初始值 + 其上游所有节点的优先级之和

    内部任务最大并行实例数

    控制此工作流在同一时间可运行的最大实例数量,用于并发控制和资源保护。当运行中的实例数达到上限时,后续被触发的新实例将进入等待状态。支持设置不限制或自定义一个最大值(上限100,000)。

    说明

    设置上限时若超过资源组最大可承受能力,则实际的并发瓶颈将由资源组的物理上限决定。

DataWorks的优先级系统遵循层级覆盖规则:运行时指定 > 节点级配置 > 工作流级配置

  1. 工作流级配置 (基准):在工作流的调度策略中配置,作为所有节点的默认设置。

  2. 节点级配置 (局部):在工作流内部单个节点的调度配置 > 调度策略中为特定节点单独设置更高优先级,会覆盖工作流级别的设置。

  3. 运行时指定 (临时):在运维中心手动触发运行时,通过运行时重置优先级开关指定的配置。该配置优先级最高,仅对当次运行生效,不修改任何永久配置。

运维与管理

  • 实例监控:所有被触发或手动运行的实例,都可以在 运维中心 > 手动任务运维 > 手动实例 页面查看、重跑、终止和排查日志。

  • 克隆工作流:在项目目录中右键单击工作流选择克隆,可以快速复制一个包含所有节点和依赖关系的新工作流。更多说明,参见周期工作流的克隆工作流

  • 版本管理:在工作流画布右侧的版本面板,可以查看、对比和还原工作流的历史版本。更多说明,参见周期工作流的版本管理

使用限制与注意事项

  • 生效环境事件触发机制仅在工作流发布到生产环境(运维中心)后生效。

  • 节点数量:单个工作流最多支持400个节点,建议控制在100个以内以简化维护。

  • 并发上限:最大并行实例数上限为100,000,但实际并发能力受您购买的调度资源组规格限制。

  • 节点级调度:在节点级别配置调度时,仅支持配置优先级,不支持配置优先级加权策略

相关文档