触发式工作流

DataWorks支持周期调度触发式调度两种工作流调度类型。与按固定时间运行的周期工作流不同,触发式工作流是一种按需执行的模式,由手动、API调用、事件消息等外部信号所触发,为数据处理提供了更高的实时性和灵活性。

功能介绍

DataWorks支持两种工作流调度模式:周期调度触发式调度

触发式调度是一种按需执行、不定时的任务调度模式。与按固定时间自动运行的周期性调度工作流不同,触发式工作流的启动由一个即时的外部信号所驱动。这种模式提供了高度的灵活性,适用于需要程序化集成或对外部事件做出响应的场景。

支持以下三种触发方式:

  • 手动触发:在DataWorks控制台手动运行工作流。

  • OpenAPI触发:外部系统通过调用OpenAPI接口触发工作流运行。

  • 事件触发:通过预配置的触发器监听特定事件(如OSS文件上传,Kafka等消息队列消息到达等),自动启动工作流。工作流发布至生产环境后,事件触发机制才会生效。

触发式工作流内部节点(如PyODPS、Shell)的配置与普通工作流完全相同,但无需配置调度周期。

配额与限制

  • 节点数量:单个工作流最多支持400个内部节点。建议将节点数控制在100个以内,以简化工作流的展示与维护。

  • 并发实例:最大并行实例数上限为100,000。

  • 生效环境:事件触发机制仅在生产环境(运维中心)生效。工作流发布后才能被事件自动触发。

  • 配置限制:在节点级别进行调度配置时,仅支持配置优先级,不支持配置优先级加权策略

功能入口

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在左侧导航栏单击image,然后在项目目录右侧单击image > 新建工作流,进入新建工作流页面。

    首次使用项目目录时,也可以直接单击新建工作流按钮进行新建。

新建触发式工作流

  1. 新建工作流页面,选择调度类型触发式调度

  2. 输入工作流名称,单击确认创建。

  3. (可选)单击画布右侧调度配置,单击调度策略,在下拉列表中,选择一个已创建好的触发器与当前工作流进行关联。

设计触发式工作流

  1. 节点编排

    在工作流看板左侧,根据需要开发的任务类型选择对应的节点,将其拖拽至画布中,并通过手动连线的方式快速配置节点间的依赖关系。

    节点配置方式与普通工作流类似,但无需配置调度周期。

  2. 工作流调度参数

    在工作流看板右侧,单击调度配置按钮,在调度参数界面单击添加参数,可以为工作流设置调度参数。该参数的作用域为当前工作流,其内部所有节点均可引用。

    说明

    工作流内部节点若配置了同名调度参数,其优先级会高于工作流调度参数。

  3. 关联触发器(可选)

    要使工作流由事件自动触发,需先在运维中心配置触发器,再于工作流画布右侧的调度配置 > 调度策略中,选择一个已创建的触发器进行关联。

  4. 优先级与并发数(高级配置)

    当多个工作流或任务同时被触发,导致系统资源出现瓶颈时,您可以通过优先级加权策略实现智能化的资源调度,确保最重要的任务优先执行。

    • 保障核心业务:为核心业务的工作流设置一个更高的优先级,使其总能优先于其他非核心工作流运行。

    • 缩短关键流程耗时:在同一个工作流实例内部,通过优先级加权策略,您可以影响节点的执行顺序。例如,使用向下加权策略,可以让处于关键路径上、拥有更多上游依赖的节点获得更高的动态权重,从而优先执行,有效缩短整个工作流的运行时长。

      配置项

      功能说明

      优先级

      定义工作流实例在调度队列中的绝对优先级别。可选级别为1、3、5、7、8(数字越大,优先级越高)。高优先级的任务/工作流总会优先于低优先级的任务/工作流获取调度资源。

      优先级加权策略

      定义同一优先级下,工作流内部各节点(Task)权重的动态计算方式。权重越高的节点将优先获得执行机会。

      • 不加权:所有节点的权重均为固定基准值。

      • 向下加权:节点的权重会动态调整,其上游依赖的节点越多,权重越高。此策略有助于DAG(有向无环图)中关键路径上的节点优先执行。权重计算方式为:权重初始值 + 其上游所有节点的优先级之和

      最大并行实例数

      控制此工作流在同一时间可运行的最大实例数量,用于并发控制和资源保护。当运行中的实例数达到上限时,后续被触发的新实例将进入等待状态。支持设置不限制或自定义一个最大值(上限100,000)。

      说明

      设置上限时若超过资源组最大可承受能力,则实际的并发瓶颈将由资源组的物理上限决定。

    DataWorks的优先级系统遵循层级覆盖规则:运行时指定 > 节点级配置 > 工作流级配置

    1. 工作流级配置 (基准):在工作流的调度策略中配置,作为所有节点的默认设置。

    2. 节点级配置 (局部):在工作流内部单个节点的调度配置 > 调度策略中为特定节点单独设置更高优先级,会覆盖工作流级别的设置。

    3. 运行时指定 (临时):在运维中心手动触发运行时,通过运行时重置优先级开关指定的配置。该配置优先级最高,仅对当次运行生效,不修改任何永久配置。

开发触发式工作流

  1. 节点开发

    在节点编辑页面,快速编辑节点代码。在代码开发时,需注意以下几点:

    • 代码语法取决于您选择的节点类型。不同类型的任务配置不同,配置详情请参见节点开发

    • 可以启用Copilot智能编程助手,以获得智能代码补全建议,提升开发效率。

    • 对于大多数节点类型,您都可以使用 ${变量名}的方式定义变量,以便在下一步骤调试任务代码时可以代入不同的赋值快速调试。

    • 在执行调度任务时,您可通过${workflow.参数名}方式在内部节点获取工作流参数的赋值。

  2. 使用触发器传递的参数(可选) 触发器可以将事件信息(如文件路径、消息内容)作为事件参数传递给工作流内部节点使用。

    Kafka触发器为例,Kafka消息为:

    消息体格式可在触发器详情的消息格式示例中查看。
    {
      "headers": {
        "headers": [],
        "isReadOnly": false
      },
      "partition": 2,
      "offset": 1,
      "topic": "demo-topic",
      "key": "demo-key",
      "value": "{\"number\":100,\"name\":\"EventBridge\"}",
      "timestamp": 1713852706576
    }

    可以在工作流内部节点中通过${workflow.trigger.message}参数获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值,当触发器触发任务运行时,参数会自动替换。举例如下:

    ${workflow.tigger.message}  #获取整个消息体
    ${workflow.tigger.message.key}  #获取JSONkey的值,结果为:demo-key
    ${workflow.tigger.message.value}  #获取JSONkey的值,结果为:{"number":100,"name":"EventBridge"}

调试触发式工作流

  1. 节点调试

    代码编辑完成后,您可单击节点编辑页右侧的调试配置,配置资源组、脚本参数等调试参数。配置完成后,可单击工具栏中的运行按钮,此时将使用您在调试配置中的调试参数运行节点。

  2. 调试工作流

    若需要在数据开发调试触发式工作流,可以单击工作流画布上方工具栏的image运行图标,在弹窗中填写触发器消息体,来模拟事件产生。

    image

发布触发式工作流

工作流调试完成后,击工具栏中的image发布按钮唤起发布看板,单击开始发布生产,任务将按照发布检查流程执行发布操作,更多信息,请参见节点/工作流发布

运行触发式工作流

工作流发布至运维中心后,进入待命状态,等待信号发生。可以通过以下几种操作,运行触发式工作流。

事件触发

  1. 根据新建触发器的事件类型,执行相关操作至触发器的监控对象,如OSS可以上传文件到您在触发器中配置的OSS Bucket,消息队列可以发送消息到触发器中配置的TopicQueue。

  2. 触发器接收到事件消息后,会触发运维中心手动任务中的触发式工作流去执行。

  3. 您可在运维中心 > 手动任务运维 > 手动实例查看并管理已执行的触发式工作流实例。通过实例日志,可以确认工作流是否被成功触发并执行。

手动触发

  1. 进入运维中心 > 手动任务运维 > 触发式工作流,找到对应的触发式工作流,单击运行。可在运行时配置运行范围、业务日期、触发器消息体等参数来指定本次运行的影响范围。

其他操作

克隆触发式工作流

您可通过克隆功能,快速克隆现有触发式工作流以创建新的工作流。克隆内容包括工作流及其内部节点(含代码调试配置调度配置),以及节点间的依赖关系和工作流本身的调度配置信息。

  1. 在左侧项目目录中,右键单击需要克隆的触发式工作流名称。

  2. 在弹出菜单中选择克隆,进入克隆弹窗。

  3. 在弹窗中修改触发式工作流名称和存储路径(可保留默认值),单击确认开始克隆。

  4. 在克隆过程中,可通过弹窗查看克隆的当前进度持续时间完成节点数等详细信息。

  5. 克隆完成后,可在项目目录中查看新生成的触发式工作流。

  6. 如需在触发式工作流中新增节点,您可以选择通过克隆方式快速创建节点,或者通过拖拽方式创建内部节点

触发式工作流版本管理

系统支持通过版本管理功能将触发式工作流还原到指定的历史版本,同时还提供版本查看与对比功能,方便您分析差异并做出调整。

  1. 在左侧项目目录中,双击目标触发式工作流名称,进入工作流画布。

  2. 单击工作流画布右侧的版本,在版本页面查看和管理开发记录发布记录信息。

    • 查看版本

      1. 您可以在开发记录发布记录页签中,找到需要查看的触发式工作流版本。

      2. 单击操作栏中的查看,在详情页中即可查看触发式工作流代码以及调度配置信息。

        说明

        调度配置信息支持脚本模式可视化模式查看,您可在调度配置页签右上角切换查看模式。

    • 对比版本

      您可以在开发记录发布记录页签中,对不同版本的触发式工作流进行对比。下面以开发记录为例,为您演示对比操作。

      • 开发或发布环境对比:在开发记录页签中,勾选两个版本,单击上方的选择对比按钮,即可对比不同版本的触发式工作流代码信息和调度配置信息。

      • 开发与发布或构建环境对比

        1. 开发记录页签中,定位到触发式工作流的某个版本。

        2. 单击操作栏中的对比按钮,在弹出的请选择对比的内容窗口中选择与发布记录构建记录中的某个版本进行对比。

    • 还原版本

      仅支持将开发记录中的触发式工作流还原到指定的历史版本。您可在开发记录页签中找到目标版本,单击操作栏中的还原按钮,即可将该触发式工作流信息还原至目标版本。

      说明

      还原时,系统会基于目标版本进行恢复,并生成一条新的版本记录。

后续步骤

触发式工作流发布后,可以在运维中心进行运维:手动任务运维