当数据处理依赖于不可预测的外部事件(如OSS新文件上传、Kafka新消息到达)时,传统的周期调度任务会因固定时间的轮询等待而造成资源浪费和数据处理延迟。DataWorks触发器专为此类场景设计,它通过实时监听外部事件信号来按需启动关联的数据工作流,让数据管道实现真正的事件驱动,从而获得更高的自动化水平和数据处理时效性。本文将详细介绍如何创建、配置和管理事件调度触发器。
功能介绍
DataWorks触发器 (Trigger) 是一个用于实现事件驱动调度的功能组件。其核心功能是监测预先定义的事件,当事件发生时,触发器负责启动关联的触发式工作流,并将事件的相关信息作为参数传递给该工作流。
- 多源事件监测:触发器可以配置为监测来自不同来源的事件。 - 外部存储事件:监测OSS Bucket中的新文件创建。 
- 外部消息事件:监测Kafka、RocketMQ等消息队列中新消息的到达。 
 
- 动态参数捕获与传递:当监测到事件时,触发器会捕获该事件的上下文信息,并将其作为参数向下游传递。 - 对于OSS事件,捕获的信息包括文件信息等。 
- 对于消息队列事件,捕获的信息包括完整的消息内容(如Key, Value, Headers等)。 
 - 这些被传递的参数,可供被触发的工作流内部节点在执行时引用,从而实现基于事件内容的数据处理。 
- 支持的类型: - 消息队列:Kafka、RocketMQ、RabbitMQ。 
- 存储对象:OSS(灰测)。 重要- 当前OSS触发器功能为灰测阶段,您可通过提交工单申请白名单进行评估,评估通过后可开通此功能。 
 
适用范围
- 地域范围:仅华东1(杭州)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)地域支持使用该功能。 
- 适用版本:仅DataWorks专业版以上支持使用该功能。如果您的当前版本不支持此功能,您可以升级DataWorks版本到专业版或更高版本。 
计费说明
触发式工作流配置了触发器并通过事件触发运行工作流,除了任务调度运行产生费用,还会在事件总线侧产生费用。费用详情见:事件流计费说明,计费方式为按事件量计费。
准备工作
- 触发器依赖事件总线EnentBridge的事件规则,需要开通事件总线EventBridge并授权。 
- 已在工作空间同地域存在要监听的对象,如OSS、云消息队列(Kafka、RocketMQ、RabbitMQ)等实例,并具备该实例的访问权限。 
- 已添加用户至指定工作空间并授予开发、运维、空间管理员角色权限。授权详情请参见添加空间成员并管理成员角色权限。 
进入触发器管理页面
- 进入运维中心页面。 - 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,在下拉框中选择对应工作空间后单击进入运维中心。 
- 在运维中心左侧导航栏单击,然后切换至触发器管理页签。 
新建触发器
若您已被添加为工作空间的开发、运维或管理员角色,可以在触发器管理页签新建触发器。
- 单击新建触发器按钮,进入新建触发器配置页面。 说明- DataWorks事件触发依赖于事件总线EventBridge,初次使用或缺少AliyunServiceRoleForDataWorksScheduler服务关联角色时,您可单击一键授权进行授权。 
- 执行该操作所需的用户权限: - ram:CreateServiceLinkedRole,具体可参见跨云服务授权。
 
- 根据以下说明完成相关参数的配置。 - Kafka触发器- 参数 - 配置说明 - 适用工作空间 - 选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。 - 适用环境 - 触发器只适用于生产环境,在开发环境运行时,需手动填写参数。 - 责任人 - 下拉配置触发器的责任人。 - 触发器类型 - 选择云消息队列Kafka版。 - 触发事件 - 支持 - alikafka:Topic:Message类型触发事件,即通过Kafka消息触发,消息发送方式参见:Kafka收发消息。- Kafka实例 - 选择与工作空间同地域下的Kafka实例。如没有可用的实例,前往新建:购买Kafka实例。 - Topic - 指定触发器需要监听的Topic对象,如果没有可用的Topic,前往新建:创建Topic。 - Key - 可预设消息的Key值,只有Key值完全匹配,才触发任务运行。非必填,若未填写,则消费任意一个消息,均会触发工作流运行。 - ConsumerGroupId - 可选择快速创建或使用已有。若使用快速创建,系统会自动创建一个自动生成名称的Group ID。 - 消息格式示例 - Kafka消息的固定示例。支持在触发工作流 - 可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。 - RocketMQ触发器- 参数 - 配置说明 - 适用工作空间 - 选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。 - 适用环境 - 触发器只适用于生产环境,在开发环境运行时,需手动填写参数。 - 责任人 - 下拉配置触发器的责任人。 - 触发器类型 - 选择云消息队列RocketMQ版。 说明- 暂不支持5.x以前的版本,默认使用5.x版本 - 触发事件 - 支持 - mq:Topic:SendMessage类型触发事件,即通过消费RocketMQ消息触发。- RocketMQ实例 - 选择与工作空间同地域下的RocketMQ实例,如没有可用的实例,前往新建:RocketMQ实例管理。 - Topic - 指定触发器需要监听的Topic对象。如没有可用的Topic,前往新建:Topic管理。 - Tag - 可预设消息的Tag值,只有Tag值完全匹配,才触发任务运行。非必填,若未填写,则消费任意一个消息,均会触发工作流运行。 - 安全组 - 若RocketMQ实例为包年包月类型,支持选择安全组。 - 消费者组 - 可选择快速创建或使用已有。若使用快速创建,系统会自动创建一个自动生成名称的Group ID。 - 消息格式示例 - RocketMQ消息的固定示例。支持在触发工作流 - 可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。 - RabbitMQ触发器- 参数 - 配置说明 - 适用工作空间 - 选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。 - 适用环境 - 触发器只适用于生产环境,在开发环境运行时,需手动填写参数。 - 责任人 - 下拉配置触发器的责任人。 - 触发器类型 - 选择云消息队列RabbitMQ版。 - 触发事件 - 支持 - amqp:Queue:SendMessage类型触发事件,即通过消费RabbitMQ消息触发。- RabbitMQ实例 - 选择与工作空间同地域下的RabbitMQ实例,如没有可用实例,请新建:新建RabbitMQ实例。 - Vhost - RabbitMQ的虚拟主机名称,对Queue进行逻辑隔离。如没有Vhost,新建参考:Vhost管理。 - Queue - 指定触发器监听的Queue对象。若没有合适的Queue,新建参考:Queue管理。 - 消息格式示例 - RabbitMQ消息体示例。支持在触发工作流 - 可以在内部任务中通过${workflow.trigger.message}方式,获取到完整的消息体,也可通过${workflow.trigger.message.xxx}获取到消息体内指定字段的值。 - OSS触发器- 参数 - 配置说明 - 适用工作空间 - 选择可使用该触发器的工作空间,仅支持新版数据开发的工作空间。 - 适用环境 - 触发器只适用于生产环境,在开发环境运行时,需手动填写参数。 - 责任人 - 下拉配置触发器的责任人。 - 触发器类型 - 对象存储OSS。 - 触发事件 - 支持以下三种事件类型: - oss:ObjectCreated:PutObject:上传文件。 
- oss:ObjectCreated:PostObject:通过HTML表单上传文件。 
- oss:ObjectCreated:CompleteMultipartUpload:文件分片上传已完成。 
 - Bucket名称 - 从下拉列表中选择作为事件源的OSS Bucket名称。如未创建,可创建OSS存储空间。 - 文件名称 - 指定触发事件的文件名称,支持通配符匹配: - 文件前缀匹配: - 配置示例: - task*。
- 配置说明:您在OSS上传以 - task为前缀的文件(例如task10.txt)时,可触发事件。
 
- 文件后缀匹配: - 配置示例: - *task.txt。
- 配置说明:您在OSS上传以 - task.txt为后缀的文件(例如work_task.txt)时,可触发事件。
 
- 灵活匹配: - 配置示例: - *task*。
- 配置说明:您在OSS上传 - task字符相关的文件(例如work_task.txt)时,可触发事件。
 
 
- 单击确认,完成触发器的创建。 
使用触发器
触发器需配置触发式工作流一起使用,且只有发布到运维中心的触发式工作流才可以通过触发器进行触发运行。
若要通过触发器实现事件调度,需结合触发式工作流一起使用。触发式工作流在提交至运维中心后,若命中监听的事件对象,即可自动触发工作流内的任务运行。
管理触发器
在触发器管理页签,找到目标触发器后,可进行查看引用任务、修改和版本查看及回滚等相关操作。
- 查看引用任务:当触发器被触发式调度工作流所引用时,您可单击操作栏中的查看引用任务 ,在查看引用任务页面中查看该触发器被哪些触发式调度工作流引用。 
- 修改触发器:单击操作栏中的修改 ,在修改触发器页面中编辑该触发器的相关信息后,单击确认完成修改。 说明- 修改完成后,系统会自动为触发器创建一个新版本。 
- 查看版本: - 单击操作栏中的版本,可在查看版本页面查看该触发器的所有历史版本。 
- 可单击操作栏中的查看了解某个版本的触发器详情。 
- 支持版本回滚。如需回滚到某个历史版本,单击历史版本后面的回滚,在弹出的提示框中填写回滚备注信息后,确定完成回滚。 说明- 回滚时,系统会根据您选择的回滚版本自动生成一个新版本。 
 
- 删除触发器:删除触发器前,请确保已下线并删除引用该触发器的所有引用任务。然后单击删除按钮,并在提示框中确认删除操作。 
后续使用
触发器创建并配置完成后,可以在触发式工作流中使用,详情参见:触发式工作流。、