基于事件流实现RabbitMQ消息路由

本文介绍如何应用事件总线EventBridge的事件流功能实现云消息队列 RabbitMQ 版的消息路由。

前提条件

背景信息

事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端云消息队列 RabbitMQ 版生产的消息可以通过事件流这个通道被路由到目标端的云消息队列 RabbitMQ 版,无需定义事件总线。更多信息,请参见事件流概述

步骤一:在目标端创建事件流

说明

事件总线EventBridge暂不支持跨地域创建云消息队列 RabbitMQ 版的事件流。

  1. 登录事件总线EventBridge控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击事件流
  4. 事件流页面,单击创建事件流

  5. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

        1. Source(源)配置向导,选择数据提供方消息队列 RabbitMQ 版,设置以下参数,然后单击下一步

          参数

          说明

          示例

          地域

          选择云消息队列 RabbitMQ 版源实例所在的地域。

          华东1(杭州)

          RabbitMQ 实例

          选择生产云消息队列 RabbitMQ 版消息的源实例。

          amqp-cn-7pp2mwbc****

          Vhost

          选择源实例中的Vhost。

          test

          Queue

          选择存储消息的队列。

          test

          批量推送条数

          调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

          100

          批量推送间隔(单位:秒)

          调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

          3

        2. Filtering(过滤)配置向导,设置事件过滤规则,单击下一步。消息路由场景无需配置Transform(转换)

        3. Sink(目标)配置向导,选择服务类型消息队列RabbitMQ,配置以下参数,单击保存

        4. 参数

          说明

          示例

          实例ID

          选择已创建的云消息队列 RabbitMQ 版实例。

          amqp-cn-zvp2pny6****

          Vhost

          选择已创建的Vhost。

          test

          目标类型

          • Exchange:生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。

          • Queue:每个消息都会被投入到一个或多个Queue里。

          Queue 模式

          Exchange

          目标类型Exchange时,选择云消息队列 RabbitMQ 版中的Exchange。

          exchange

          Queue

          目标类型Queue时,选择云消息队列 RabbitMQ 版中的选择接收消息的队列。

          queue

          消息路由规则(Routing Key)

          事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。当目标类型Exchange时需要配置。

          部分事件

          $.data.key

          消息体(body)

          事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

          部分事件

          $.data.body

          MessageId

          事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

          部分事件

          $.data.props.messageId

          自定义属性(Properties)

          事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

          部分事件

          $.data.props
    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  6. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

  7. 提示对话框,阅读提示信息,然后单击确认

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

步骤二:调用SDK发送消息

  1. 获取接入点。您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在发送消息时,您需要为发布端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

    1. 登录云消息队列 RabbitMQ 版控制台

    2. 概览页面的资源分布区域,选择地域。

    3. 实例列表页面,单击目标实例名称。

    4. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

      类型

      说明

      示例值

      公网接入点

      公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

      XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

      VPC接入点

      VPC环境可读写。按量付费实例和预付费实例默认都支持。

      XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

  2. 安装Java依赖库。在pom.xml添加以下依赖。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持开源所有版本 -->
    </dependency>
  3. 生成用户名密码。

    1. 登录云消息队列 RabbitMQ 版控制台

    2. 概览页面的资源分布区域,选择地域。

    3. 实例列表页面,单击目标实例名称。

    4. 在左侧导航栏,单击静态用户名密码

    5. 静态用户名密码页面,单击创建用户名密码

    6. 创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定

      静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

    7. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

  4. 生产消息。创建并编译运行Qava

    重要

    编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。

    表 3. 参数列表

    参数

    示例值

    描述

    hostName

    1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

    云消息队列 RabbitMQ 版实例接入点。

    Port

    5672

    默认端口。非加密端口为5672,加密端口为5671。

    userName

    MjoxODgwNzcwODY5MD****

    云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面获取。

    passWord

    NDAxREVDQzI2MjA0OT****

    云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secrettimestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码获取。

    virtualHost

    Test

    云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情

    ExchangeName

    ExchangeTest

    云消息队列 RabbitMQ 版Exchange。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,结合实例IDVhost模糊搜索已创建的Exchange。

    BindingKey

    BindingKeyTest

    云消息队列 RabbitMQ 版ExchangeQueueBinding Key。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。

    QueueName

    QueueTest

    云消息队列 RabbitMQ 版Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    public class ProducerTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            //设置实例的接入点。
            String hostName = "xxx.xxx.aliyuncs.com";
            //设置实例的静态用户名密码。
            String userName = "${UserName}";
            String passWord = "${PassWord}";
            //设置实例的Vhost。
            String virtualHost = "${VirtualHost}";
    
            //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
            Connection connection = createConnection(hostName, userName, passWord, virtualHost);
            Channel channel = connection.createChannel();
    
            //设置Exchange、Queue和绑定关系。
            String exchangeName = "${ExchangeName}";
            String queueName = "${QueueName}";
            String routingKey = "${RoutingKey}";
            //设置Exchange类型。
            String exchangeType = "${ExchangeType}";
    
            //此处为了体验流畅,确保了ExchangeQueue的创建过程。
            //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
            channel.queueBind(queueName, exchangeName, routingKey);
            //开始发送消息。
            for (int i = 0; i < 10; i++  ) {
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
            }
            connection.close();
        }
    
        public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(hostName);
            factory.setUsername(userName);
            factory.setPassword(passWord);
            //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
            factory.setAutomaticRecoveryEnabled(true);
            factory.setNetworkRecoveryInterval(5000);
            factory.setVirtualHost(virtualHost);
            //默认端口,非加密端口5672,加密端口5671。
            factory.setPort(5672);
            //基于网络环境合理设置超时时间。
            factory.setConnectionTimeout(30 * 1000);
            factory.setHandshakeTimeout(30 * 1000);
            factory.setShutdownTimeout(0);
            Connection connection = factory.newConnection();
            return connection;
        }
    }

步骤三:验证事件流

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例列表页面,单击步骤一:在目标端创建事件流中配置的目标实例名称。

  4. 实例详情页面的基本信息区域,单击消息查询

  5. 设置查询方式为按 Message ID 查询按 Queue 查询,然后设置时间范围,单击查询

    查询消息

  6. 查看查询到的消息内容是否与步骤二:调用SDK发送消息中发送的消息一致。