管理触发器

当数据处理依赖于不可预测的外部事件(如OSS新文件上传、Kafka新消息到达)时,传统的周期调度任务会因固定时间的轮询等待而造成资源浪费和数据处理延迟。DataWorks触发器专为此类场景设计,它通过实时监听外部事件信号来按需启动关联的数据工作流,让数据管道实现真正的事件驱动,从而获得更高的自动化水平和数据处理时效性。本文将详细介绍如何创建、配置和管理事件调度触发器

功能介绍

DataWorks触发器 (Trigger) 是一个用于实现事件驱动调度的功能组件。其核心功能是监测预先定义的事件,当事件发生时,触发器负责启动关联的触发式工作流,并将事件的相关信息作为参数传递给该工作流。

  • 多源事件监测:触发器可以配置为监测来自不同来源的事件。

    • 外部存储事件:监测OSS Bucket中的新文件创建。

    • 外部消息事件:监测Kafka、RocketMQ等消息队列中新消息的到达。

  • 动态参数捕获与传递:当监测到事件时,触发器会捕获该事件的上下文信息,并将其作为参数向下游传递。

    • 对于OSS事件,捕获的信息包括文件信息等。

    • 对于消息队列事件,捕获的信息包括完整的消息内容(如Key, Value, Headers等)。

    这些被传递的参数,可供被触发的工作流内部节点在执行时引用,从而实现基于事件内容的数据处理。

  • 支持的类型:

    • 消息队列:Kafka、RocketMQ、RabbitMQ。

    • 存储对象:OSS(灰测)。

      重要

      当前OSS触发器功能为灰测阶段,您可通过提交工单申请白名单进行评估,评估通过后可开通此功能。

适用范围

  • 地域范围:仅华东1(杭州)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)地域支持使用该功能。

  • 适用版本:仅DataWorks专业版以上支持使用该功能。如果您的当前版本不支持此功能,您可以升级DataWorks版本到专业版或更高版本

计费说明

触发式工作流配置了触发器并通过事件触发运行工作流,除了任务调度运行产生费用,还会在事件总线侧产生费用。费用详情见:事件流计费说明,计费方式为按事件量计费。

准备工作

进入触发器管理页面

  1. 进入运维中心页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 运维中心,在下拉框中选择对应工作空间后单击进入运维中心

  2. 在运维中心左侧导航栏单击其他 > 调度设置,然后切换至触发器管理页签。

新建触发器

若您已被添加为工作空间的开发、运维或管理员角色,可以在触发器管理页签新建触发器。

  1. 单击新建触发器按钮,进入新建触发器配置页面。

    说明
  2. 根据以下说明完成相关参数的配置。

    Kafka触发器

    参数

    配置说明

    适用工作空间

    选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。

    适用环境

    触发器只适用于生产环境,在开发环境运行时,需手动填写参数。

    责任人

    下拉配置触发器的责任人。

    触发器类型

    选择云消息队列Kafka

    触发事件

    支持alikafka:Topic:Message类型触发事件,即通过Kafka消息触发,消息发送方式参见:Kafka收发消息

    Kafka实例

    选择与工作空间同地域下的Kafka实例。如没有可用的实例,前往新建:购买Kafka实例

    Topic

    指定触发器需要监听的Topic对象,如果没有可用的Topic,前往新建:创建Topic

    Key

    可预设消息的Key值,只有Key值完全匹配,才触发任务运行。非必填,若未填写,则消费任意一个消息,均会触发工作流运行。

    ConsumerGroupId

    可选择快速创建使用已有。若使用快速创建,系统会自动创建一个自动生成名称的Group ID。

    消息格式示例

    Kafka消息的固定示例。支持在触发工作流

    可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。

    RocketMQ触发器

    参数

    配置说明

    适用工作空间

    选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。

    适用环境

    触发器只适用于生产环境,在开发环境运行时,需手动填写参数。

    责任人

    下拉配置触发器的责任人。

    触发器类型

    选择云消息队列RocketMQ

    说明

    暂不支持5.x以前的版本,默认使用5.x版本

    触发事件

    支持mq:Topic:SendMessage类型触发事件,即通过消费RocketMQ消息触发。

    RocketMQ实例

    选择与工作空间同地域下的RocketMQ实例,如没有可用的实例,前往新建:RocketMQ实例管理

    Topic

    指定触发器需要监听的Topic对象。如没有可用的Topic,前往新建:Topic管理

    Tag

    可预设消息的Tag值,只有Tag值完全匹配,才触发任务运行。非必填,若未填写,则消费任意一个消息,均会触发工作流运行。

    安全组

    RocketMQ实例为包年包月类型,支持选择安全组。

    消费者组

    可选择快速创建使用已有。若使用快速创建,系统会自动创建一个自动生成名称的Group ID。

    消息格式示例

    RocketMQ消息的固定示例。支持在触发工作流

    可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。

    RabbitMQ触发器

    参数

    配置说明

    适用工作空间

    选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。

    适用环境

    触发器只适用于生产环境,在开发环境运行时,需手动填写参数。

    责任人

    下拉配置触发器的责任人。

    触发器类型

    选择云消息队列RabbitMQ

    触发事件

    支持amqp:Queue:SendMessage类型触发事件,即通过消费RabbitMQ消息触发。

    RabbitMQ实例

    选择与工作空间同地域下的RabbitMQ实例,如没有可用实例,请新建:新建RabbitMQ实例

    Vhost

    RabbitMQ的虚拟主机名称,对Queue进行逻辑隔离。如没有Vhost,新建参考:Vhost管理

    Queue

    指定触发器监听的Queue对象。若没有合适的Queue,新建参考:Queue管理

    消息格式示例

    RabbitMQ消息体示例。支持在触发工作流

    可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。

    OSS触发器

    参数

    配置说明

    适用工作空间

    选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。

    适用环境

    触发器只适用于生产环境,在开发环境运行时,需手动填写参数。

    责任人

    下拉配置触发器的责任人。

    触发器类型

    对象存储OSS。

    触发事件

    支持以下三种事件类型:

    Bucket名称

    从下拉列表中选择作为事件源的OSS Bucket名称。如未创建,可创建OSS存储空间

    文件名称

    指定触发事件的文件名称,支持通配符匹配:

    • 文件前缀匹配:

      • 配置示例:task*

      • 配置说明:您在OSS上传以task为前缀的文件(例如task10.txt)时,可触发事件。

    • 文件后缀匹配:

      • 配置示例:*task.txt

      • 配置说明:您在OSS上传以task.txt为后缀的文件(例如work_task.txt)时,可触发事件。

    • 灵活匹配:

      • 配置示例:*task*

      • 配置说明:您在OSS上传task字符相关的文件(例如work_task.txt)时,可触发事件。

  3. 单击确认,完成触发器的创建。

使用触发器

触发器需配置触发式工作流一起使用,且只有发布到运维中心的触发式工作流才可以通过触发器进行触发运行。

若要通过触发器实现事件调度,需结合触发式工作流一起使用。触发式工作流在提交至运维中心后,若命中监听的事件对象,即可自动触发工作流内的任务运行。

  1. 新建触发式工作流

  2. 触发式工作流时,在调度策略中配置您所创建的触发器。

  3. 触发式工作流的内部节点配置方式与普通工作流相同。

    其核心区别在于执行机制:它不依赖于固定的时间周期进行调度,而是由触发器 (Trigger) 响应外部事件来驱动执行。

管理触发器

触发器管理页签,找到目标触发器后,可进行查看引用任务、修改和版本查看及回滚等相关操作。

  • 查看引用任务:当触发器被触发式调度工作流所引用时,您可单击操作栏中的查看引用任务 ,在查看引用任务页面中查看该触发器被哪些触发式调度工作流引用。

  • 修改触发器:单击操作栏中的修改 ,在修改触发器页面中编辑该触发器的相关信息后,单击确认完成修改。

    说明

    修改完成后,系统会自动为触发器创建一个新版本。

  • 查看版本

    1. 单击操作栏中的版本,可在查看版本页面查看该触发器的所有历史版本。

    2. 可单击操作栏中的查看了解某个版本的触发器详情。

    3. 支持版本回滚。如需回滚到某个历史版本,单击历史版本后面的回滚,在弹出的提示框中填写回滚备注信息后,确定完成回滚。

      说明

      回滚时,系统会根据您选择的回滚版本自动生成一个新版本。

  • 删除触发器:删除触发器前,请确保已下线并删除引用该触发器的所有引用任务。然后单击删除按钮,并在提示框中确认删除操作。

后续使用

触发器创建并配置完成后,可以在触发式工作流中使用,详情参见:触发式工作流