本文介绍如何应用事件总线EventBridge的事件流功能实现云消息队列 RabbitMQ 版的消息路由。
前提条件
您已购买并部署云消息队列 RabbitMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源。
背景信息
事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端云消息队列 RabbitMQ 版生产的消息可以通过事件流这个通道被路由到目标端的云消息队列 RabbitMQ 版,无需定义事件总线。更多信息,请参见事件流概述。
步骤一:在目标端创建事件流
事件总线EventBridge暂不支持跨地域创建云消息队列 RabbitMQ 版的事件流。
- 登录事件总线EventBridge控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击事件流。
在事件流页面,单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RabbitMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择云消息队列 RabbitMQ 版源实例所在的地域。
华东1(杭州)
RabbitMQ 实例
选择生产云消息队列 RabbitMQ 版消息的源实例。
amqp-cn-7pp2mwbc****
Vhost
选择源实例中的Vhost。
test
Queue
选择存储消息的队列。
test
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)、Transform(转换)配置向导,设置事件过滤、转换规则,单击下一步。事件转换的配置说明,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为消息队列RabbitMQ版,配置以下参数,单击保存。
Exchange:生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。
Queue:每个消息都会被投入到一个或多个Queue里。
参数
说明
示例
实例ID
选择已创建的云消息队列 RabbitMQ 版实例。
amqp-cn-zvp2pny6****
Vhost
选择已创建的Vhost。
test
目标类型
Queue 模式
Exchange
当目标类型为Exchange时,选择云消息队列 RabbitMQ 版中的Exchange。
exchange
Queue
当目标类型为Queue时,选择云消息队列 RabbitMQ 版中的选择接收消息的队列。
queue
消息路由规则(Routing Key)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。当目标类型为Exchange时需要配置。
部分事件
$.data.key
消息体(body)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.body
MessageId
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.props.messageId
自定义属性(Properties)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤二:调用SDK发送消息
获取接入点。您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在发送消息时,您需要为发布端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的图标,复制该接入点。
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装Java依赖库。在pom.xml添加以下依赖。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
生成用户名密码。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
生产消息。创建并编译运行Qava。
重要编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
表 3. 参数列表 参数
示例值
描述
hostName
1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com
云消息队列 RabbitMQ 版实例接入点。
Port
5672
默认端口。非加密端口为5672,加密端口为5671。
userName
MjoxODgwNzcwODY5MD****
在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码页面获取。
passWord
NDAxREVDQzI2MjA0OT****
在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码获取。
virtualHost
Test
云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情。
ExchangeName
ExchangeTest
云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。
BindingKey
BindingKeyTest
云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。
QueueName
QueueTest
云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import java.util.HashMap; import java.util.UUID; public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。 factory.setHost("xxx.xxx.aliyuncs.com"); // 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。 factory.setUsername("${UserName}"); // 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。 factory.setPassword("${PassWord}"); //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。 factory.setVirtualHost("${VhostName}"); // 默认端口,非加密端口5672,加密端口5671。 factory.setPort(5672); // 基于网络环境合理设置超时时间。 factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null); channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>()); channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}"); // 开始发送消息。 for (int i = 0; i < 100; i++ ) { // ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。 // BindingKey根据业务需求填入相应的BindingKey。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props, ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8)); } connection.close(); } }
步骤三:验证事件流
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例列表页面,单击步骤一:在目标端创建事件流中配置的目标实例名称。
在实例详情页面的基本信息区域,单击消息查询。
设置查询方式为按 Message ID 查询或按 Queue 查询,然后设置时间范围,单击查询。
查看查询到的消息内容是否与步骤二:调用SDK发送消息中发送的消息一致。
- 本页导读 (1)