触发式工作流

DataWorks支持周期调度触发式调度两种工作流调度类型。与按固定时间运行的周期工作流不同,触发式工作流是一种按需执行的模式,由手动、API调用、事件消息等外部信号所触发,为数据处理提供了更高的实时性和灵活性。

功能介绍

DataWorks支持两种工作流调度模式:周期调度触发式调度

触发式调度是一种按需执行、不定时的任务调度模式。与按固定时间自动运行的周期性调度工作流不同,触发式工作流的启动由一个即时的外部信号所驱动。这种模式提供了高度的灵活性,适用于需要程序化集成或对外部事件做出响应的场景。

这个外部信号支持三种类型:

  • 手动触发:用户在控制台手动运行。

  • OpenAPI触发:外部系统通过调用OpenAPI接口触发工作流运行。

  • 事件触发:通过预配置的触发器监听特定事件(如OSS文件上传,Kafka等消息队列消息到达等),自动启动工作流。

    • 配置方式:触发式工作流可在工作流界面右侧的调度配置 > 调度策略中关联触发器,将工作流设置为事件触发。

    • 消息内容获取触发式工作流可以在节点代码中通过参数直接引用触发事件所携带的消息体信息。这使得工作流具备了动态处理能力,能够根据不同的输入(如不同的文件名或消息内容)执行不同的逻辑。

触发式工作流内部节点(如PyODPS、Shell)的配置与普通工作流完全相同,但无需配置调度周期。

说明

事件触发机制仅在生产环境(运维中心)生效。因此,工作流必须发布后才能被事件自动触发。

准备工作

已在运维中心配置触发器

功能入口

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在左侧导航栏单击image,然后在项目目录右侧单击image > 新建工作流,进入新建工作流页面。

    首次使用项目目录时,也可以直接单击新建工作流按钮进行新建。

新建触发式工作流

  1. 新建工作流页面,选择调度类型触发式调度

  2. 输入工作流名称,单击确认创建。

  3. (可选)单击画布右侧调度配置,单击调度策略,在下拉列表中,选择一个已创建好的触发器与当前工作流进行关联。

设计触发式工作流

  1. 节点编排

    在工作流看板左侧,根据需要开发的任务类型选择对应的节点,将其拖拽至画布中,并通过手动连线的方式快速配置节点间的依赖关系。

    节点配置方式与普通工作流类似,但无需配置调度周期。

    单个工作流最多支持创建400个内部节点,但为了获得最佳的展示和维护体验,建议控制在100个节点以内。
  2. 工作流调度参数

    在工作流看板右侧,单击调度配置按钮,在调度参数界面单击添加参数,可以为工作流设置调度参数。该参数的作用域为当前工作流,其内部所有节点均可引用。

    说明

    工作流内部节点若配置了同名调度参数,其优先级会高于工作流调度参数。

开发触发式工作流

  1. 节点开发

    在节点编辑页面,快速编辑节点代码。在代码开发时,需注意以下几点:

    • 代码语法取决于您选择的节点类型。不同类型的任务配置不同,配置详情请参见节点开发

    • 可以启用Copilot智能编程助手,以获得智能代码补全建议,提升开发效率。

    • 对于大多数节点类型,您都可以使用 ${变量名}的方式定义变量,以便在下一步骤调试任务代码时可以代入不同的赋值快速调试。

    • 在执行调度任务时,您可通过${workflow.参数名}方式在内部节点获取工作流参数的赋值。

  2. 使用触发器传递的参数(可选) 触发器可以将事件信息(如文件路径、消息内容)作为事件参数传递给工作流内部节点使用。

    Kafka触发器为例,Kafka消息为:

    消息体格式可在触发器详情的消息格式示例中查看。
    {
      "headers": {
        "headers": [],
        "isReadOnly": false
      },
      "partition": 2,
      "offset": 1,
      "topic": "demo-topic",
      "key": "demo-key",
      "value": "{\"number\":100,\"name\":\"EventBridge\"}",
      "timestamp": 1713852706576
    }

    可以在工作流内部节点中通过${workflow.trigger.message}参数获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值,当触发器触发任务运行时,参数会自动替换。举例如下:

    ${workflow.tigger.message}  #获取整个消息体
    ${workflow.tigger.message.key}  #获取JSONkey的值,结果为:demo-key
    ${workflow.tigger.message.value}  #获取JSONkey的值,结果为:{"number":100,"name":"EventBridge"}

调试触发式工作流

  1. 节点调试

    代码编辑完成后,您可单击节点编辑页右侧的调试配置,配置资源组、脚本参数等调试参数。配置完成后,可单击工具栏中的运行按钮,此时将使用您在调试配置中的调试参数运行节点。

  2. 调试工作流

    若需要在数据开发调试触发式工作流,可以单击工作流画布上方工具栏的image运行图标,在弹窗中填写触发器消息体,来模拟事件产生。

    image

发布触发式工作流

工作流调试完成后,击工具栏中的image发布按钮唤起发布看板,单击开始发布生产,任务将按照发布检查流程执行发布操作,更多信息,请参见节点/工作流发布

运行触发式工作流

工作流发布至运维中心后,进入待命状态,等待信号发生。可以通过以下几种操作,运行触发式工作流。

事件触发

  1. 根据新建触发器的事件类型,执行相关操作至触发器的监控对象,如OSS可以上传文件到您在触发器中配置的OSS Bucket,消息队列可以发送消息到触发器中配置的TopicQueue。

  2. 触发器接收到事件消息后,会触发运维中心手动任务中的触发式工作流去执行。

  3. 您可在运维中心 > 手动任务运维 > 手动实例查看并管理已执行的触发式工作流实例。通过实例日志,可以确认工作流是否被成功触发并执行。

手动触发

  1. 进入运维中心 > 手动任务运维 > 触发式工作流,找到对应的触发式工作流,单击运行。可在运行时配置运行范围、业务日期、触发器消息体等参数来指定本次运行的影响范围。

其他操作

克隆触发式工作流

您可通过克隆功能,快速克隆现有触发式工作流以创建新的工作流。克隆内容包括工作流及其内部节点(含代码调试配置调度配置),以及节点间的依赖关系和工作流本身的调度配置信息。

  1. 在左侧项目目录中,右键单击需要克隆的触发式工作流名称。

  2. 在弹出菜单中选择克隆,进入克隆弹窗。

  3. 在弹窗中修改触发式工作流名称和存储路径(可保留默认值),单击确认开始克隆。

  4. 在克隆过程中,可通过弹窗查看克隆的当前进度持续时间完成节点数等详细信息。

  5. 克隆完成后,可在项目目录中查看新生成的触发式工作流。

  6. 如需在触发式工作流中新增节点,您可以选择通过克隆方式快速创建节点,或者通过拖拽方式创建内部节点

触发式工作流版本管理

系统支持通过版本管理功能将触发式工作流还原到指定的历史版本,同时还提供版本查看与对比功能,方便您分析差异并做出调整。

  1. 在左侧项目目录中,双击目标触发式工作流名称,进入工作流画布。

  2. 单击工作流画布右侧的版本,在版本页面查看和管理开发记录发布记录信息。

    • 查看版本

      1. 您可以在开发记录发布记录页签中,找到需要查看的触发式工作流版本。

      2. 单击操作栏中的查看,在详情页中即可查看触发式工作流代码以及调度配置信息。

        说明

        调度配置信息支持脚本模式可视化模式查看,您可在调度配置页签右上角切换查看模式。

    • 对比版本

      您可以在开发记录发布记录页签中,对不同版本的触发式工作流进行对比。下面以开发记录为例,为您演示对比操作。

      • 开发或发布环境对比:在开发记录页签中,勾选两个版本,单击上方的选择对比按钮,即可对比不同版本的触发式工作流代码信息和调度配置信息。

      • 开发与发布或构建环境对比

        1. 开发记录页签中,定位到触发式工作流的某个版本。

        2. 单击操作栏中的对比按钮,在弹出的请选择对比的内容窗口中选择与发布记录构建记录中的某个版本进行对比。

    • 还原版本

      仅支持将开发记录中的触发式工作流还原到指定的历史版本。您可在开发记录页签中找到目标版本,单击操作栏中的还原按钮,即可将该触发式工作流信息还原至目标版本。

      说明

      还原时,系统会基于目标版本进行恢复,并生成一条新的版本记录。

后续步骤

触发式工作流发布后,可以在运维中心进行运维:手动任务运维