本文介绍如何应用事件总线EventBridge的事件流功能实现消息队列RabbitMQ版的消息路由。

前提条件

背景信息

事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端消息队列RabbitMQ版生产的消息可以通过事件流这个通道被路由到目标端的消息队列RabbitMQ版,无需定义事件总线。更多信息,请参见事件流概述

步骤一:在目标端创建事件流

说明 事件总线EventBridge暂不支持跨地域创建消息队列RabbitMQ版的事件流。
  1. 登录事件总线EventBridge控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击事件流
  4. 事件流页面,单击创建事件流
    创建事件流
  5. 创建事件流面板,完成以下操作。
    1. 基本信息配置向导,输入事件流名称和描述,然后单击下一步
    2. 事件源配置向导,选择事件提供方消息队列 RabbitMQ 版,配置以下参数,然后单击下一步
      参数 说明 示例
      地域 选择消息队列RabbitMQ版源实例所在的地域。 华东1(杭州)
      RabbitMQ 实例 选择生产消息队列RabbitMQ版消息的源实例。 amqp-cn-7pp2mwbc****
      Vhost 选择生产消息队列RabbitMQ版消息的Vhost。 test
      Queue 配置源实例中用于生产消息的队列。 test
    3. 规则配置向导,单击下一步
    4. 目标配置向导,选择服务类型消息队列 RabbitMQ 版,配置以下参数,然后单击创建
      参数 说明 示例
      实例ID 选择接收消息队列RabbitMQ版消息的目标实例。 amqp-cn-zvp2pny6****
      Vhost 选择目标实例的Vhost。 test
      目标类型
      • Exchange 模式:生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。
      • Queue 模式:每个消息都会被投入到一个或多个Queue里。
      Queue 模式
      Queue 选择接收源端RabbitMQ消息的队列。 queue
      Body 选择部分事件事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.body
      说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。
      MessageId 选择部分事件事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.props.messageId
      说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。
      Properties 选择部分事件事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。 $.data.props
      说明 如果需要全量传递源端的RabbitMQ消息的属性,推荐使用此配置。
  6. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用
  7. 提示对话框,阅读提示信息,然后单击确认
    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

步骤二:调用SDK发送消息

  1. 获取接入点。您需要在消息队列RabbitMQ版控制台获取实例的接入点。在发送消息时,您需要为发布端配置该接入点,通过接入点接入消息队列RabbitMQ版实例。
    1. 登录消息队列RabbitMQ版控制台
    2. 概览页面的资源分布区域,选择地域。
    3. 实例列表页面,单击目标实例名称。
    4. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。
      类型 说明 示例值
      公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
      VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
  2. 安装Java依赖库。在pom.xml添加以下依赖。
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持开源所有版本 -->
    </dependency>
  3. 生成用户名密码。
    1. 登录消息队列RabbitMQ版控制台
    2. 概览页面的资源分布区域,选择地域。
    3. 实例列表页面,单击目标实例名称。
    4. 在左侧导航栏,单击静态用户名密码
    5. 静态用户名密码页面,单击创建用户名密码
    6. 创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定
      静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码
    7. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
  4. 生产消息。创建并编译运行ProducerTest.java
    注意 编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置表 1中所列举的参数。
    表 1. 参数列表
    参数 示例值 描述
    hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com 消息队列RabbitMQ版实例接入点。
    Port 5672 默认端口。非加密端口为5672,加密端口为5671。
    userName MjoxODgwNzcwODY5MD**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和消息队列RabbitMQ版实例ID通过Base64编码后生成的静态用户名。您可以在消息队列RabbitMQ版控制台静态用户名密码页面获取。
    passWord NDAxREVDQzI2MjA0OT**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在消息队列RabbitMQ版控制台静态用户名密码获取。
    virtualHost Test 消息队列RabbitMQ版实例的Vhost。您可以在消息队列RabbitMQ版控制台Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情
    ExchangeName ExchangeTest 消息队列RabbitMQ版的Exchange。您可以在消息队列RabbitMQ版控制台Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。
    BindingKey BindingKeyTest 消息队列RabbitMQ版Exchange与Queue的Binding Key。您可以在消息队列RabbitMQ版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。
    QueueName QueueTest 消息队列RabbitMQ版的Queue。仅在订阅消息时候需要配置,您可以在消息队列RabbitMQ版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    import java.util.HashMap;
    import java.util.UUID;
    
    public class ProducerTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
            factory.setHost("xxx.xxx.aliyuncs.com");
            // 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
            factory.setUsername("${UserName}");
            // 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
            factory.setPassword("${PassWord}");
            //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
            factory.setAutomaticRecoveryEnabled(true);
            factory.setNetworkRecoveryInterval(5000);
            // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
            factory.setVirtualHost("${VhostName}");
            // 默认端口,非加密端口5672,加密端口5671。
            factory.setPort(5672);
            // 基于网络环境合理设置超时时间。
            factory.setConnectionTimeout(30 * 1000);
            factory.setHandshakeTimeout(30 * 1000);
            factory.setShutdownTimeout(0);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();        
            channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
            channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
            channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
            // 开始发送消息。
            for (int i = 0; i < 100; i++  ) {
                // ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
                // BindingKey根据业务需求填入相应的BindingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                        ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
            }
            connection.close();
        }
    }

步骤三:验证事件流

  1. 登录消息队列RabbitMQ版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 实例列表页面,单击步骤一:在目标端创建事件流中配置的目标实例名称。
  4. 实例详情页面的基本信息区域,单击消息查询
  5. 设置查询方式为按 Message ID 查询按 Queue 查询,然后设置时间范围,单击查询
    查询消息
  6. 查看查询到的消息内容是否与步骤二:调用SDK发送消息中发送的消息一致。