首页 高弹性,低成本,云消息队列 RabbitMQ实践

高弹性,低成本,云消息队列 RabbitMQ实践

更新时间: 2024-10-08 15:28:12

手动部署

45

https://www.aliyun.com/solution/tech-solution/rabbitmq-serverless

方案概览

消息队列 RabbitMQ 版由阿里云基于 AMQP 标准协议自研,完全兼容 RabbitMQ 开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。

方案架构

本方案将创建云消息队列 RabbitMQ 版 Serverless 系列实例来进行消息的生产和消费,根据实际使用量计费,并查看指标变化和消息轨迹。

image

本方案架构包含消息生产者、消息队列、消息消费者三个核心模块。消息在云消息队列 RabbitMQ 版的组件之间流动过程为:

  1. 生产者向 Exchange 发送消息;

  2. Exchange 根据消息属性将消息路由到 Queue 进行存储;

  3. 消费者从 Queue 拉取消息进行消费。

部署准备

5

开始部署前,请按以下指引完成账号申请、账号充值。

准备账号

  1. 如果您还没有阿里云账号,请访问阿里云账号注册页面,根据页面提示完成注册并进行个人实名认证。阿里云账号是您使用云资源的付费实体,因此是部署方案的必要前提。

  2. 为阿里云账号充值。本方案的云资源支持按量付费,且默认设置均采用按量付费引导操作。

部署资源

25

1.获取AccessKey并授权

  1. 创建RAM用户。

    1. 使用阿里云账号(主账号)或RAM管理员登录RAM控制台

    2. 在左侧导航栏,选择身份管理 > 用户

    3. 用户页面,单击创建用户

    4. 创建用户页面的用户账号信息区域,设置用户基本信息。

      • 登录名称:可包含英文字母、数字、半角句号(.)、短划线(-)和下划线(_),最多64个字符。

      • 显示名称:最多包含128个字符或汉字。

    5. 访问方式区域,选择OpenAPI 调用访问

    6. 单击确定

    7. 根据界面提示,完成安全验证。

  2. 获取AccessKey。

    创建RAM用户成功后,在创建成功页面中复制AccessKey ID和AccessKey Secret。

    重要

    系统会自动为RAM用户生成一个AccessKey ID和AccessKey Secret,且AccessKey Secret只在创建时显示,不支持查看,请妥善保管。

    image

  3. 创建自定义权限策略。

    1. 使用阿里云账号(主账号)或RAM管理员登录RAM控制台

    2. 在左侧导航栏,选择权限管理 > 权限策略

    3. 权限策略页面,单击创建权限策略

    4. 创建权限策略页面,单击可视化编辑页签。

    5. 配置权限策略。

      1. 效果区域,选择允许

      2. 服务区域,选择其他类目中的云消息队列 MQ / Amqp

      3. 操作区域,选择全部操作

      4. 资源区域,选择全部资源

    6. 单击继续编辑基本信息

    7. 输入权限策略名称备注

    8. 单击确定

    9. 根据界面提示,完成安全验证。

  4. 向RAM用户授权自定义权限策略。

    1. 使用阿里云账号(主账号)或RAM管理员登录RAM控制台

    2. 在左侧导航栏,选择身份管理 > 用户

    3. 用户页面,单击目标RAM用户操作列的添加权限

      image

    4. 新增授权面板,为RAM用户添加权限。

      1. 选择资源范围:账号级别

      2. 选择授权主体:保持默认值即可。

      3. 选择权限策略:单击所有策略类型下拉框,选择自定义策略,在下面的策略列表表格中选中前一步骤中创建的自定义权限策略名称。

      4. 单击确认新增授权

      5. 根据界面提示,完成安全验证。

    5. 单击关闭

2.创建实例和用户名密码

  1. 前往云消息队列 RabbitMQ 版控制台,在“实例列表”页面,点击“创建实例”,在弹窗中选择付费方式为“Serverless 按量后付费”

C36BC4F2-C3F4-490F-AAA0-FEDD44011330.png

EF80CFEB-EDB4-487A-8039-5D10F7D6B5D0.png

  1. 在实例购买页面,“地域和可用区”选择“华东1(杭州)”“公网支持”“消息轨迹支持”选项都选择“支持”,其他选项均保持默认值即可。勾选“服务协议”并点击“立即购买”,点击创建。

0FBAE920-C1AA-4A4E-8385-9E8CE12ADA7D.png

  1. 创建成功后返回管理控制台,在实例列表页面可以看到一个新增的实例,点击实例名称或详情,即可进入实例详情页。

13654F3C-84F6-450B-ADDD-9B424F616CD6.png

  1. 云消息队列 RabbitMQ 版控制台的左侧导航栏中,点击实例列表,打开具体实例,单击静态用户名密码

AAA451F3-8357-41D5-B5DE-06CCDD62D333.png

静态用户名密码页面,单击创建用户名密码

A091286B-21A9-4C82-AB0B-E76539AEDEDC.png

创建用户名密码面板,填写上一步创建的AccessKey IDAccessKey Secret,单击确定

5FB1DB04-C1FA-4BB5-86C6-9E81A30B8FA8.png

返回如下页面,您可查看到创建的用户名和密码。

E758FD82-A171-4939-BD04-F98958F9C96F.png

3.创建资源和绑定关系

重要

为了确保示例程序的正常运行,请严格按照步骤中给出的名称进行命名。如有必要修改,请务必同步更新示例程序中的属性。

  1. 在左侧导航栏中,单击Vhost列表

479726EC-7F59-4B89-A4C0-486696BCE69D.png

Vhost列表页面,单击创建Vhost

9D89755A-E927-4D1C-B598-DE05B539B63B.png

创建Vhost面板,Vhost名称设置为test-vhost

image

  1. 在左侧导航栏中,单击Exchange列表

B54F0C18-43E0-48BA-BC8D-A79722027F6E.png

Exchange列表页面的Vhost test-vhost下,单击创建Exchange

image

创建Exchange面板,Exchange名称设置为test-exchange类型选择directInternal选择,单击确定

说明:更多Exchange类型说明,详情请参见Exchanges and Exchange Types

image

  1. 在左侧导航栏中,单击Queue列表

963D02A1-7552-459C-A6DB-FA1434CC332D.png

Queue列表页面的Vhost test-vhost下,单击创建Queue

image

创建Queue面板,Queue名称设置为test-queueAuto delete选择,其他选项保持默认值即可,单击确定

image

  1. 创建Exchange和Queue的绑定关系。

在左侧导航栏中,单击Exchange列表

2AC51D0E-A744-442B-99E3-43015BC98CE0.png

Exchange列表页面,单击test-exchange

image

在test-exchange详情页面的绑定关系区域,单击添加绑定

image

添加绑定面板,绑定目标类型选择Queue绑定目标选择创建的test-queueRouting Key设置为test-routing-key,单击确定

说明:发送消息时设置的Routing Key需要和当前绑定关系中的Routing Key完全匹配,消息才能路由到Queue。本实验设置路由到test-queue,其Routing Key为test-routing-key。

image

方案验证

10

一、通过Java实现消息收发,验证RabbitMQ消息流转

说明:云消息队列RabbitMQ版与开源RabbitMQ完全兼容,请关注SDK使用注意事项。更多语言SDK,请参见SDK列表及示例代码

说明

您需要提前安装IDE安装JDK安装Maven。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> 
    </dependency>

生产消息

在已创建的Java工程中,创建消息发送程序,按照代码参数说明部分配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        //设置实例的Vhost。
        String virtualHost = "test-vhost";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,
        // 以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "test-exchange";
        String queueName = "test-queue";
        String bindingKey = "test-routing-key";
        //设置Exchange类型。
        String exchangeType = "direct";

        //开始发送消息。
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("消息发送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId()
                    + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        channel.close();
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

订阅消息

在已创建的Java工程中,创建消息订阅程序,按照代码参数说明部分配置相关参数并运行

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        //设置实例的Vhost。
        String virtualHost = "test-vhost";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "test-queue";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false,
                new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "test-consumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: "
                        + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

代码参数说明

参数

描述

获取步骤

hostName

云消息队列 RabbitMQ 版实例接入点。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击复制该接入点。

userName

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

部署资源步骤中获取的静态用户名

passWord

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

部署资源步骤中获取的静态用户密码

二、通过消息轨迹,验证消息传递过程

  1. 返回消息队列RabbitMQ版控制台页签,在左侧导航栏中,单击Dashboard

image

  1. 服务关联角色对话框中,单击授权

说明:如果您是首次使用Dashboard功能,需要进行授权。

image

  1. 开通ARMS服务对话框中,单击开通

说明:如果您是首次使用Dashboard功能,需要开通ARMS服务。

image

  1. 查看RabbitMQ实例的概览指标,可以查看每个Queue的消息堆积数,消息速率等指标。

image

  1. 在左侧导航栏中,单击Queue列表

image

  1. Queue列表页面,单击test-queue

image

  1. 在test-queue详情页面,单击Dashboard

image

  1. Dashboard页签,可以查看指定Queue的详细指标变化趋势,用于定位问题。

image

  1. 在左侧导航栏中,单击消息轨迹

image

  1. 在消息轨迹页面,您可以根据按Message ID查询和按Queue查询。

  • 按Message ID查询:根据IntelliJ IDEA控制台打印的messageId可以精确查询对应消息的轨迹。

image

  • 按Queue查询:根据Queue名称可以查询对应Queue下所有消息的轨迹。

image

  1. 选择任意一条消息的轨迹,单击轨迹详情,查询对应消息的生产和投递轨迹详情。

image

三、通过多线程并发测试,验证RabbitMQ高并发性能

1.为了验证RabbitMQ在高并发场景下的性能,我们可以通过多线程的方式来模拟大量消息的并发发送。具体代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

public class ConcurrentProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        // 设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        // 设置实例的Vhost。
        String virtualHost = "test-vhost";

        // 创建连接
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // 设置Exchange、Queue和绑定关系。
        String exchangeName = "test-exchange";
        String queueName = "test-queue";
        String bindingKey = "test-routing-key";
        String exchangeType = "direct";

        // 创建线程池
        int threadCount = 50; // 线程数量
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

        // 开始发送消息
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    for (int j = 0; j < 1000; j++) { // 每个线程发送1000条
                        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                        channel.basicPublish(exchangeName, bindingKey, true, props,
                                ("并发消息发送示例Body-" + Thread.currentThread().getId() + "-" + j).getBytes(StandardCharsets.UTF_8));
                        System.out.println("[SendResult] Thread ID: " + Thread.currentThread().getId()
                                + ", Message sent successfully, messageId: " + props.getMessageId());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {
            // 等待所有线程完成
        }

        // 关闭通道和连接
        channel.close();
        connection.close();
    }

    // 创建连接的方法
    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        factory.setPort(5672);
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        return factory.newConnection();
    }
}

以上代码模拟50个用户并发发送消息的场景,总共发送5万条消息。

2.运行上述的并发生产者代码后,消费者代码依然使用之前的示例代码。参考之前的步骤,查看队列的Dashboard。

image

image

image

可以看到,当前队列的消息处理耗时为22ms,消息堆积在4分钟内完成发送,消息处理延时则最大为37ms。这表明,RabbitMQ能够在高并发场景下处理大量并发消息的发送和接收,保持高效的消息传递。低延时的表现确保了实时性要求较高的应用场景能够正常运行。

完成及清理

5

在完成本方案操作后,如果无需继续使用资源,请根据以下步骤,先删除RAM用户,再删除RabbitMQ实例。

  1. 删除RAM用户。

使用阿里云账号登录控制台,删除RAM用户,详细操作请参见删除RAM用户

  1. 删除RabbitMQ实例。

2.1. 点击RabbitMQ控制台,在实例列表中找到创建的实例,点击更多 > 删除,等待实例删除完成。

image

自建迁移上云(可选)

如果您希望从开源 RabbitMQ 迁移到云消息队列 RabbitMQ 版,请参见开源RabbitMQ迁移上云

一键部署

30

https://www.aliyun.com/solution/tech-solution/rabbitmq-serverless

方案概览

消息队列 RabbitMQ 版由阿里云基于 AMQP 标准协议自研,完全兼容 RabbitMQ 开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。

方案架构

本方案将创建云消息队列 RabbitMQ 版 Serverless 系列实例来进行消息的生产和消费,根据实际使用量计费,并查看指标变化和消息轨迹。

image

本方案架构包含消息生产者、消息队列、消息消费者三个核心模块。消息在云消息队列 RabbitMQ 版的组件之间流动过程为:

  1. 生产者向 Exchange 发送消息;

  2. Exchange 根据消息属性将消息路由到 Queue 进行存储;

  3. 消费者从 Queue 拉取消息进行消费。

部署准备

5

开始部署前,请按以下指引完成账号申请、账号充值。

准备账号

  1. 如果您还没有阿里云账号,请访问阿里云账号注册页面,根据页面提示完成注册并进行个人实名认证。阿里云账号是您使用云资源的付费实体,因此是部署方案的必要前提。

  2. 为阿里云账号充值。本方案的云资源支持按量付费,且默认设置均采用按量付费引导操作。

部署资源

10

一键部署

单击一键部署前往ROS控制台,修改资源栈名称,进行安全确认后点击创建。

说明

ROS控制台目前不支持设置静态用户名和密码的步骤,用户可以在输出中查看AK和SK,然后到RabbitMQ控制台设置静态用户名和密码。

image

开始创建后,等待约5分钟左右,显示创建成功。

查看密钥

点击输出栏,查看AccessKeyId、AccessKeySecret。

image

设置静态用户名密码

  1. 云消息队列 RabbitMQ 版控制台的左侧导航栏中,点击实例列表,打开具体实例,单击静态用户名密码

AAA451F3-8357-41D5-B5DE-06CCDD62D333.png

  1. 静态用户名密码页面,单击创建用户名密码

image

  1. 创建用户名密码面板,填写上一步创建的AccessKey IDAccessKey Secret,单击确定

image

  1. 返回如下页面,您可查看到创建的用户名和密码。

image

方案验证

10

一、通过Java实现消息收发,验证RabbitMQ消息流转

说明:云消息队列RabbitMQ版与开源RabbitMQ完全兼容,请关注SDK使用注意事项。更多语言SDK,请参见SDK列表及示例代码

说明

您需要提前安装IDE安装JDK安装Maven。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> 
    </dependency>

生产消息

在已创建的Java工程中,创建消息发送程序,按照代码参数说明部分配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        //设置实例的Vhost。
        String virtualHost = "test-vhost";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,
        // 以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "test-exchange";
        String queueName = "test-queue";
        String bindingKey = "test-routing-key";
        //设置Exchange类型。
        String exchangeType = "direct";

        //开始发送消息。
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("消息发送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId()
                    + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        channel.close();
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

订阅消息

在已创建的Java工程中,创建消息订阅程序,按照代码参数说明部分配置相关参数并运行

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        //设置实例的Vhost。
        String virtualHost = "test-vhost";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "test-queue";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false,
                new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "test-consumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: "
                        + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

代码参数说明

参数

描述

获取步骤

hostName

云消息队列 RabbitMQ 版实例接入点。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击复制该接入点。

userName

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

部署资源步骤中获取的静态用户名

passWord

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

部署资源步骤中获取的静态用户密码

二、通过消息轨迹,验证消息传递过程

  1. 返回消息队列RabbitMQ版控制台页签,在左侧导航栏中,单击Dashboard

image

  1. 服务关联角色对话框中,单击授权

说明:如果您是首次使用Dashboard功能,需要进行授权。

image

  1. 开通ARMS服务对话框中,单击开通

说明:如果您是首次使用Dashboard功能,需要开通ARMS服务。

image

  1. 查看RabbitMQ实例的概览指标,可以查看每个Queue的消息堆积数,消息速率等指标。

image

  1. 在左侧导航栏中,单击Queue列表

image

  1. Queue列表页面,单击test-queue

image

  1. 在test-queue详情页面,单击Dashboard

image

  1. Dashboard页签,可以查看指定Queue的详细指标变化趋势,用于定位问题。

image

  1. 在左侧导航栏中,单击消息轨迹

image

  1. 在消息轨迹页面,您可以根据按Message ID查询和按Queue查询。

  • 按Message ID查询:根据IntelliJ IDEA控制台打印的messageId可以精确查询对应消息的轨迹。

image

  • 按Queue查询:根据Queue名称可以查询对应Queue下所有消息的轨迹。

image

  1. 选择任意一条消息的轨迹,单击轨迹详情,查询对应消息的生产和投递轨迹详情。

image

三、通过多线程并发测试,验证RabbitMQ高并发性能

1.为了验证RabbitMQ在高并发场景下的性能,我们可以通过多线程的方式来模拟大量消息的并发发送。具体代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

public class ConcurrentProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        // 设置实例的静态用户名密码。
        String userName = "UserName";
        String passWord = "PassWord";
        // 设置实例的Vhost。
        String virtualHost = "test-vhost";

        // 创建连接
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // 设置Exchange、Queue和绑定关系。
        String exchangeName = "test-exchange";
        String queueName = "test-queue";
        String bindingKey = "test-routing-key";
        String exchangeType = "direct";

        // 创建线程池
        int threadCount = 50; // 线程数量
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

        // 开始发送消息
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    for (int j = 0; j < 1000; j++) { // 每个线程发送1000条
                        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                        channel.basicPublish(exchangeName, bindingKey, true, props,
                                ("并发消息发送示例Body-" + Thread.currentThread().getId() + "-" + j).getBytes(StandardCharsets.UTF_8));
                        System.out.println("[SendResult] Thread ID: " + Thread.currentThread().getId()
                                + ", Message sent successfully, messageId: " + props.getMessageId());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {
            // 等待所有线程完成
        }

        // 关闭通道和连接
        channel.close();
        connection.close();
    }

    // 创建连接的方法
    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        factory.setPort(5672);
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        return factory.newConnection();
    }
}

以上代码模拟50个用户并发发送消息的场景,总共发送5万条消息。

2.运行上述的并发生产者代码后,消费者代码依然使用之前的示例代码。参考之前的步骤,查看队列的Dashboard。

image

image

image

可以看到,当前队列的消息处理耗时为22ms,消息堆积在4分钟内完成发送,消息处理延时则最大为37ms。这表明,RabbitMQ能够在高并发场景下处理大量并发消息的发送和接收,保持高效的消息传递。低延时的表现确保了实时性要求较高的应用场景能够正常运行。

完成及清理

5

点击ROS控制台,在资源栈列表中找到创建的实例,点击删除,清理创建的资源。

image

自建迁移上云(可选)

如果您希望从开源 RabbitMQ 迁移到云消息队列 RabbitMQ 版,请参见开源RabbitMQ迁移上云