调用SDK收发消息
本文以Java SDK为例说明如何调用SDK收发消息。
前提条件
背景信息
客户端接入云消息队列 RabbitMQ 版时,云消息队列 RabbitMQ 版会通过用户名密码进行权限认证。云消息队列 RabbitMQ 版支持通过控制台为客户端生成静态用户名密码,该方式和开源RabbitMQ保持一致。
您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection。
收发消息流程(以Java语言为例)

云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK。
获取接入点
您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的
图标,复制该接入点。
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装Java依赖库
在pom.xml添加以下依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
生成用户名密码
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID和AccessKey Secret,然后单击确定。
说明AccessKey ID和AccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
生产消息
创建并编译运行ProducerTest.java。
编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
参数 | 示例值 | 描述 |
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取接入点。 |
Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 |
userName | MjoxODgwNzcwODY5MD**** | 在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码页面获取。 |
passWord | NDAxREVDQzI2MjA0OT**** | 在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码获取。 |
virtualHost | Test | 云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情。 |
ExchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。 |
BindingKey | BindingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。 |
QueueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。 |
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setUsername("${UserName}");
// 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setPassword("${PassWord}");
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
// 开始发送消息。
for (int i = 0; i < 100; i++ ) {
// ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
// BindingKey根据业务需求填入相应的BindingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
}
}
订阅消息
创建并编译运行ConsumerTest.java。
编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setUsername("${Username}");
// 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setPassword("${Password}");
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 创建${ExchangeName},该Exchange必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
// 创建${QueueName} ,该Queue必须在消息队列RabbitMQ版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
// Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
// 开始消费消息。
channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多参数说明,请参见开源RabbitMQ客户端文档。