本文介绍如何在事件总线EventBridge控制台添加云消息队列 RocketMQ 版作为事件流中的事件提供方。
前提条件
操作步骤
登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RocketMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择消息队列RocketMQ版源实例所在的地域。
华东1(杭州)
版本
选择RocketMQ实例版本。支持以下两个版本:
RocketMQ 4.x
RocketMQ 5.x
RocketMQ 5.x
RocketMQ 实例
选择生产消息的源云消息队列 RocketMQ 版实例。
rmq-cn-jte3w5i****
Topic
选择生产消息队列RocketMQ版消息的Topic。
topic
Tag
配置源实例中用于过滤消息的Tag。
test
Group ID
RocketMQ消费组名称。
快速创建:推荐方案。如果您选择自动创建,系统将自动创建以
GID_EVENTBRIDGE_xxx命名的 Group ID。使用已有:如果您选择使用已创建的Group,请选择独立的 Group ID,不要和已有的业务混用,以免影响已有的消息收发。
快速创建
消费位点
选择开始消费消息的位点。
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
指定时间戳:从指定时间开始消费。
指定时间戳
消费时间点
选择消费时间点。仅当消费点位配置为指定时间戳时需配置此参数。
2025-03-10 17:27:01
VPC
为RocketMQ实例的VPC ID,系统会自动选择,无需您手动选择。
vpc-****
交换机
为RocketMQ实例的交换机ID,系统会自动选择,无需您手动选择。
vsw-****
安全组
支持以下两种方案。
快速创建:推荐方案,如果您选择此种方案,会自动创建以eb-毫秒时间戳-security_group命名的安全组。
使用已有:如果您选择此种方案,需要选择该VPC下的安全组,推荐使用默认安全组策略。
快速创建
数据格式(Body)
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发送给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)、Transform(转换)及Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
事件示例
{
"specversion":"1.0",
"id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source":"acs:mq",
"type":"mq:Topic:SendMessage",
"subject":"acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
"datacontenttype":"application/json; charset=utf-8",
"time":"2021-04-08T06:01:20.766Z",
"aliyunpublishtime":"2021-04-08T06:01:20.725Z",
"aliyuneventbusname":"BusName",
"data":{
"topic":"TopicName",
"systemProperties":{
"MIN_OFFSET":"0",
"TRACE_ON":"true",
"MAX_OFFSET":"8",
"MSG_REGION":"cn-hangzhou",
"KEYS":"systemProperties.KEYS",
"CONSUME_START_TIME":1628577790396,
"UNIQ_KEY":"AC14C305069E1B28CDFA3181CDA2****",
"TAGS":"systemProperties.TAGS",
"INSTANCE_ID":"MQ_INST_123456789098****_BXhFHryi"
},
"userProperties":{
},
"body":"TEST"
}
}CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数 | 类型 | 示例 | 描述 |
topic | String | TopicName | Topic名称。 |
systemProperties | Map | 系统属性。 | |
MIN_OFFSET | Int | 0 | 最低位点。 |
TRACE_ON | Boolean | true | 是否有消息轨迹。取值说明如下:
|
MAX_OFFSET | Int | 8 | 最高位点。 |
MSG_REGION | String | cn-hangzhou | 发送消息的地域。 |
KEYS | String | systemProperties.KEYS | 过滤属性。 |
CONSUME_START_TIME | Long | 1628577790396 | 开始消费时间。单位:毫秒。 |
UNIQ_KEY | String | AC14C305069E1B28CDFA3181CDA2**** | 消息唯一键。 |
TAGS | String | systemProperties.TAGS | 过滤属性。 |
INSTANCE_ID | String | MQ_INST_123456789098****_BXhFHryi | 实例ID。 |
userProperties | Map | 用户属性。 | |
body | String | TEST | 消息内容。 |