本文介绍如何应用事件总线EventBridge的事件流功能实现消息队列RabbitMQ版的消息路由。
前提条件
- 您已开通事件总线EventBridge并授权。
- 您已购买并部署消息队列RabbitMQ版实例,且实例处于服务中状态。具体步骤,请参见创建资源。
背景信息
事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端消息队列RabbitMQ版生产的消息可以通过事件流这个通道被路由到目标端的消息队列RabbitMQ版,无需定义事件总线。更多信息,请参见事件流概述。
步骤一:在目标端创建事件流
说明 事件总线EventBridge暂不支持跨地域创建消息队列RabbitMQ版的事件流。
- 登录事件总线EventBridge控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,单击事件流。
- 在事件流页面,单击创建事件流。
- 在创建事件流面板,完成以下操作。
- 在基本信息配置向导,输入事件流名称和描述,然后单击下一步。
- 在事件源配置向导,选择事件提供方为消息队列 RabbitMQ 版,配置以下参数,然后单击下一步。
参数 说明 示例 地域 选择消息队列RabbitMQ版源实例所在的地域。 华东1(杭州) RabbitMQ 实例 选择生产消息队列RabbitMQ版消息的源实例。 amqp-cn-7pp2mwbc**** Vhost 选择生产消息队列RabbitMQ版消息的Vhost。 test Queue 配置源实例中用于生产消息的队列。 test - 在规则配置向导,单击下一步。
- 在目标配置向导,选择服务类型为消息队列 RabbitMQ 版,配置以下参数,然后单击创建。
参数 说明 示例 实例ID 选择接收消息队列RabbitMQ版消息的目标实例。 amqp-cn-zvp2pny6**** Vhost 选择目标实例的Vhost。 test 目标类型 - Exchange 模式:生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。
- Queue 模式:每个消息都会被投入到一个或多个Queue里。
Queue 模式 Queue 选择接收源端RabbitMQ消息的队列。 queue Body 选择部分事件。事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.body 说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。MessageId 选择部分事件。事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.props.messageId 说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。Properties 选择部分事件。事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.props 说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。
- 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
- 在提示对话框,阅读提示信息,然后单击确认。启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤二:调用SDK发送消息
- 获取接入点。您需要在消息队列RabbitMQ版控制台获取实例的接入点。在发送消息时,您需要为发布端配置该接入点,通过接入点接入消息队列RabbitMQ版实例。
- 登录消息队列RabbitMQ版控制台。
- 在概览页面的资源分布区域,选择地域。
- 在实例列表页面,单击目标实例名称。
- 在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的
图标,复制该接入点。
类型 说明 示例值 公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
- 安装Java依赖库。在pom.xml添加以下依赖。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
- 生成用户名密码。
- 登录消息队列RabbitMQ版控制台。
- 在概览页面的资源分布区域,选择地域。
- 在实例列表页面,单击目标实例名称。
- 在左侧导航栏,单击静态用户名密码。
- 在静态用户名密码页面,单击创建用户名密码。
- 在创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定。静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
- 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
- 生产消息。创建并编译运行ProducerTest.java。注意 编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置表 1中所列举的参数。
表 1. 参数列表 参数 示例值 描述 hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com 消息队列RabbitMQ版实例接入点。 Port 5672 默认端口。非加密端口为5672,加密端口为5671。 userName MjoxODgwNzcwODY5MD**** 在消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和消息队列RabbitMQ版实例ID通过Base64编码后生成的静态用户名。您可以在消息队列RabbitMQ版控制台的静态用户名密码页面获取。 passWord NDAxREVDQzI2MjA0OT**** 在消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在消息队列RabbitMQ版控制台的静态用户名密码获取。 virtualHost Test 消息队列RabbitMQ版实例的Vhost。您可以在消息队列RabbitMQ版控制台的Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情。 ExchangeName ExchangeTest 消息队列RabbitMQ版的Exchange。您可以在消息队列RabbitMQ版控制台的Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。 BindingKey BindingKeyTest 消息队列RabbitMQ版Exchange与Queue的Binding Key。您可以在消息队列RabbitMQ版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。 QueueName QueueTest 消息队列RabbitMQ版的Queue。仅在订阅消息时候需要配置,您可以在消息队列RabbitMQ版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。 import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import java.util.HashMap; import java.util.UUID; public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。 factory.setHost("xxx.xxx.aliyuncs.com"); // 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。 factory.setUsername("${UserName}"); // 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。 factory.setPassword("${PassWord}"); //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。 factory.setVirtualHost("${VhostName}"); // 默认端口,非加密端口5672,加密端口5671。 factory.setPort(5672); // 基于网络环境合理设置超时时间。 factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null); channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>()); channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}"); // 开始发送消息。 for (int i = 0; i < 100; i++ ) { // ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。 // BindingKey根据业务需求填入相应的BindingKey。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props, ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8)); } connection.close(); } }
步骤三:验证事件流
- 登录消息队列RabbitMQ版控制台。
- 在概览页面的资源分布区域,选择地域。
- 在实例列表页面,单击步骤一:在目标端创建事件流中配置的目标实例名称。
- 在实例详情页面的基本信息区域,单击消息查询。
- 设置查询方式为按 Message ID 查询或按 Queue 查询,然后设置时间范围,单击查询。
- 查看查询到的消息内容是否与步骤二:调用SDK发送消息中发送的消息一致。