本文介绍如何应用事件总线EventBridge的事件流功能实现云消息队列 RabbitMQ 版的消息路由。
前提条件
您已购买并部署云消息队列 RabbitMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源。
背景信息
事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端云消息队列 RabbitMQ 版生产的消息可以通过事件流这个通道被路由到目标端的云消息队列 RabbitMQ 版,无需定义事件总线。更多信息,请参见事件流概述。
步骤一:在目标端创建事件流
事件总线EventBridge暂不支持跨地域创建云消息队列 RabbitMQ 版的事件流。
- 登录事件总线EventBridge控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击事件流。
在事件流页面,单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RabbitMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择云消息队列 RabbitMQ 版源实例所在的地域。
华东1(杭州)
RabbitMQ 实例
选择生产云消息队列 RabbitMQ 版消息的源实例。
amqp-cn-7pp2mwbc****
Vhost
选择源实例中的Vhost。
test
Queue
选择存储消息的队列。
test
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置事件过滤规则,单击下一步。消息路由场景无需配置Transform(转换)。
在Sink(目标)配置向导,选择服务类型为消息队列RabbitMQ版,配置以下参数,单击保存。
Exchange:生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。
Queue:每个消息都会被投入到一个或多个Queue里。
参数
说明
示例
实例ID
选择已创建的云消息队列 RabbitMQ 版实例。
amqp-cn-zvp2pny6****
Vhost
选择已创建的Vhost。
test
目标类型
Queue 模式
Exchange
当目标类型为Exchange时,选择云消息队列 RabbitMQ 版中的Exchange。
exchange
Queue
当目标类型为Queue时,选择云消息队列 RabbitMQ 版中的选择接收消息的队列。
queue
消息路由规则(Routing Key)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。当目标类型为Exchange时需要配置。
部分事件
$.data.key
消息体(body)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.body
MessageId
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.props.messageId
自定义属性(Properties)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.props
任务属性
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤二:调用SDK发送消息
获取接入点。您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在发送消息时,您需要为发布端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的图标,复制该接入点。
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装Java依赖库。在pom.xml添加以下依赖。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
生成用户名密码。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
生产消息。创建并编译运行Qava。
重要编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
表 3. 参数列表 参数
示例值
描述
hostName
1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com
云消息队列 RabbitMQ 版实例接入点。
Port
5672
默认端口。非加密端口为5672,加密端口为5671。
userName
MjoxODgwNzcwODY5MD****
在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码页面获取。
passWord
NDAxREVDQzI2MjA0OT****
在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码获取。
virtualHost
Test
云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情。
ExchangeName
ExchangeTest
云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。
BindingKey
BindingKeyTest
云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。
QueueName
QueueTest
云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeoutException; public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { //设置实例的接入点。 String hostName = "xxx.xxx.aliyuncs.com"; //设置实例的静态用户名密码。 String userName = "${UserName}"; String passWord = "${PassWord}"; //设置实例的Vhost。 String virtualHost = "${VirtualHost}"; //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。 Connection connection = createConnection(hostName, userName, passWord, virtualHost); Channel channel = connection.createChannel(); //设置Exchange、Queue和绑定关系。 String exchangeName = "${ExchangeName}"; String queueName = "${QueueName}"; String routingKey = "${RoutingKey}"; //设置Exchange类型。 String exchangeType = "${ExchangeType}"; //此处为了体验流畅,确保了Exchange和Queue的创建过程。 //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>()); channel.queueBind(queueName, exchangeName, routingKey); //开始发送消息。 for (int i = 0; i < 10; i++ ) { AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish(exchangeName, routingKey, true, props, ("消息发送示例Body-" + i).getBytes(StandardCharsets.UTF_8)); System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey); } connection.close(); } public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setUsername(userName); factory.setPassword(passWord); //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); factory.setVirtualHost(virtualHost); //默认端口,非加密端口5672,加密端口5671。 factory.setPort(5672); //基于网络环境合理设置超时时间。 factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); Connection connection = factory.newConnection(); return connection; } }
步骤三:验证事件流
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例列表页面,单击步骤一:在目标端创建事件流中配置的目标实例名称。
在实例详情页面的基本信息区域,单击消息查询。
设置查询方式为按 Message ID 查询或按 Queue 查询,然后设置时间范围,单击查询。
查看查询到的消息内容是否与步骤二:调用SDK发送消息中发送的消息一致。