本文介绍如何在事件总线EventBridge控制台添加云消息队列 RocketMQ 版作为事件流中的事件提供方。
前提条件
您已购买并部署云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建实例。
操作步骤
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RocketMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择消息队列RocketMQ版源实例所在的地域。
华东1(杭州)
版本
选择RocketMQ实例版本。
RocketMQ 4.x
RocketMQ 实例
选择生产消息队列RocketMQ版消息的源实例。
MQ_INST_115964845466****_ByBeUp3p
Topic
选择生产消息队列RocketMQ版消息的Topic。
topic
Tag
配置源实例中用于过滤消息的Tag。
test
Group ID
选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
GID_http_1
消费位点
选择开始消费消息的位点。
最新位点
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)、Transform(转换)及Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
事件示例
{
"specversion":"1.0",
"id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source":"acs:mq",
"type":"mq:Topic:SendMessage",
"subject":"acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
"datacontenttype":"application/json; charset=utf-8",
"time":"2021-04-08T06:01:20.766Z",
"aliyunpublishtime":"2021-04-08T06:01:20.725Z",
"aliyuneventbusname":"BusName",
"data":{
"topic":"TopicName",
"systemProperties":{
"MIN_OFFSET":"0",
"TRACE_ON":"true",
"MAX_OFFSET":"8",
"MSG_REGION":"cn-hangzhou",
"KEYS":"systemProperties.KEYS",
"CONSUME_START_TIME":1628577790396,
"UNIQ_KEY":"AC14C305069E1B28CDFA3181CDA2****",
"TAGS":"systemProperties.TAGS",
"INSTANCE_ID":"MQ_INST_123456789098****_BXhFHryi"
},
"userProperties":{
},
"body":"TEST"
}
}
CloudEvents规范中定义的参数解释,请参见事件概述。
data字段包含的参数解释如下表所示。
参数 | 类型 | 示例 | 描述 |
topic | String | TopicName | Topic名称。 |
systemProperties | Map | 系统属性。 | |
MIN_OFFSET | Int | 0 | 最低位点。 |
TRACE_ON | Boolean | true | 是否有消息轨迹。取值说明如下:
|
MAX_OFFSET | Int | 8 | 最高位点。 |
MSG_REGION | String | cn-hangzhou | 发送消息的地域。 |
KEYS | String | systemProperties.KEYS | 过滤属性。 |
CONSUME_START_TIME | Long | 1628577790396 | 开始消费时间。单位:毫秒。 |
UNIQ_KEY | String | AC14C305069E1B28CDFA3181CDA2**** | 消息唯一键。 |
TAGS | String | systemProperties.TAGS | 过滤属性。 |
INSTANCE_ID | String | MQ_INST_123456789098****_BXhFHryi | 实例ID。 |
userProperties | Map | 用户属性。 | |
body | String | TEST | 消息内容。 |