本文以调用Java SDK为例,介绍在主账号与RAM账号场景生成用户名与密码,通过开源SDK实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程,其他语言或框架的SDK消息收发过程相似。

背景信息

开源RabbitMQ客户端接入云上服务时,需要先通过AccessKey和AccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的 userNamepassWord参数中。 消息队列RabbitMQ版会通过用户名和密码进行权限认证。
注意 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见 Connection

收发消息流程(以Java语言为例)

开源SDK收发流程
说明 消息队列RabbitMQ版与开源RabbitMQ完全兼容。更多语言SDK,请参见 表 1

获取接入点

您需要在消息队列RabbitMQ版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入消息队列RabbitMQ版实例。

  1. 登录消息队列RabbitMQ版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 实例列表页面,单击目标实例名称。
  4. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。
    类型 说明 示例值
    公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

pom.xml添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>

生成用户名密码

  1. 登录消息队列RabbitMQ版控制台
  2. 概览页面的资源分布区域,选择地域。
  3. 实例列表页面,单击目标实例名称。
  4. 在左侧导航栏,单击静态用户名密码
  5. 静态用户名密码页面,单击创建用户名密码
  6. 创建用户名密码面板,输入AccessKey ID,输入AccessKey Secret,单击确定
    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。 用户名密码
  7. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

生产消息

创建并编译运行ProducerTest.java

注意 编译运行 ProducerTest.java生产消息之前,您需要根据代码提示信息配置 表 1中所列举的参数。
表 1. 参数列表
参数 示例值 描述
hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com 消息队列RabbitMQ版实例接入点。获取方式,请参见获取接入点
Port 5672 默认端口。非加密端口为5672,加密端口为5671。
userName MjoxODgwNzcwODY5MD**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和消息队列RabbitMQ版实例ID通过Base64编码后生成的静态用户名。您可以在消息队列RabbitMQ版控制台静态用户名密码页面,根据实例ID搜索已创建的用户名。
passWord NDAxREVDQzI2MjA0OT**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在消息队列RabbitMQ版控制台静态用户名密码页面,根据实例ID搜索已创建的用户名以及对应密码。
virtualHost Test 消息队列RabbitMQ版实例的Vhost。您可以在消息队列RabbitMQ版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost详情
ExchangeName ExchangeTest 消息队列RabbitMQ版的Exchange。您可以在消息队列RabbitMQ版控制台Exchange 列表页面获取。
BindingKey BindingKeyTest 消息队列RabbitMQ版Exchange与Queue的Binding Key。您可以在消息队列RabbitMQ版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。
QueueName QueueTest 消息队列RabbitMQ版的Queue。仅在订阅消息时候需要配置,您可以在消息队列RabbitMQ版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在消息队列RabbitMQ版控制台用户名密码管理页面查看。
        factory.setUsername("${UserName}");
        // 密码,在消息队列RabbitMQ版控制台用户名密码管理页面查看。
        factory.setPassword("${PassWord}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();        
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

订阅消息

创建并编译运行ConsumerTest.java

注意 编译运行 ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置 表 1中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在消息队列RabbitMQ版控制台用户名密码管理页面查看。
        factory.setUsername("${Username}");
        // 密码,在消息队列RabbitMQ版控制台用户名密码管理页面查看。
        factory.setPassword("${Password}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}
取消
说明 消息队列RabbitMQ版与开源RabbitMQ完全兼容。更多参数说明,请参见 开源RabbitMQ客户端文档

查询消息

如果您想确认消息是否成功发送至消息队列RabbitMQ版,可以在消息队列RabbitMQ版控制台查询消息。具体操作,请参见查询消息