步骤三:调用SDK收发消息
本文以Java SDK为例,说明如何将开源SDK客户端接入云消息队列 RabbitMQ 版服务端,并完成消息收发。
前提条件
获取实例接入点
在收发消息时,您需要为发布端和订阅端配置该接入点,客户端通过接入点接入云消息队列 RabbitMQ 版实例。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的
图标,复制该接入点。
类型
说明
示例值
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.net.mq.amqp.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.vpc.mq.amqp.aliyuncs.com
安装Java依赖库
在IDEA中创建一个Java工程。
在pom.xml文件中添加以下依赖引入Java依赖库。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
生成用户名密码
开源RabbitMQ客户端接入云上服务时,需要先通过AccessKey ID和AccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的userName和passWord参数中。云消息队列 RabbitMQ 版会通过用户名和密码进行权限认证。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID和AccessKey Secret,然后单击确定。
AccessKey ID和AccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
创建客户端连接
创建连接管理工厂ConnectionFactory.java
,用于启动开源客户端和云消息队列 RabbitMQ 版服务端的连接。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
private final String hostName;
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final boolean enableSSL;
public ConnectionFactory(String hostName, int port, String userName,
String password, String virtualHost, boolean enableSSL) {
this.hostName = hostName;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.enableSSL = enableSSL;
}
public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//create a new con
Connection con = createCon();
//create a new channel
return con.createChannel();
}
private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默认端口。
factory.setPort(port);
if (enableSSL) {
setSSL(factory);
}
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
return factory.newConnection();
}
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
public void closeCon(Channel channel) {
if (channel != null && channel.getConnection() != null) {
try {
channel.getConnection().close();
} catch (Throwable t) {
}
}
}
}
生产消息
在已创建的Java工程中,创建消息发送程序Producer.java
,按照SDK参数填写说明配置相关参数并运行。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
//设置为云消息队列 RabbitMQ 版实例的接入点。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//设置为云消息队列 RabbitMQ 版实例的静态用户名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//设置为云消息队列 RabbitMQ 版实例的静态用户名密码。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//设置为云消息队列 RabbitMQ 版实例的Vhost名称。
public static final String virtualHost = "vhost_test";
//如果使用5671端口,需要enableSSL设置为true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private Channel channel;
private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
private final ConnectionFactory factory;
private final String exchangeName;
private final String queueName;
private final String routingKey;
public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.factory = factory;
this.outstandingConfirms = new ConcurrentSkipListMap<>();
this.channel = factory.createChannel();
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
}
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//构建连接工厂。
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
//初始化生产者。
Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");
//declare。
producer.declare();
producer.initChannel();
//发送消息。
producer.doSend("hello,amqp");
}
private void initChannel() throws IOException {
channel.confirmSelect();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
for (Long tag : confirmed.keySet()) {
String msgId = confirmed.get(tag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
}
confirmed.clear();
} else {
String msgId = outstandingConfirms.remove(deliveryTag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
String msgId = outstandingConfirms.get(deliveryTag);
System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
// send msg failed, re-publish
});
channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
}
private void declare() throws IOException {
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}
private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
try {
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
} catch (AlreadyClosedException e) {
//need reconnect if channel is closed.
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null
&& errorMsg.contains("channel.close")
&& errorMsg.contains("reply-code=541")
&& errorMsg.contains("reply-text=InternalError")) {
return true;
} else {
return false;
}
}
}
云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多限流信息,请参见实例限流最佳实践。
订阅消息
在已创建的Java工程中,创建消息订阅程序Consumer.java
,按照SDK参数填写说明配置相关参数并运行。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
//设置为云消息队列 RabbitMQ 版实例的接入点。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//设置为云消息队列 RabbitMQ 版实例的静态用户名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//设置为云消息队列 RabbitMQ 版实例的静态用户名密码。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//设置为云消息队列 RabbitMQ 版实例的Vhost名称。
public static final String virtualHost = "vhost_test";
//如果使用5671端口,需要enableSSL设置为true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private final Channel channel;
private final String queue;
public Consumer(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
Channel channel = factory.createChannel();
channel.basicQos(50);
//设置为云消息队列 RabbitMQ 版实例的Queue名称。需要和生产者中设置的Queue名称一致。
Consumer consumer = new Consumer(channel, "queue-1");
consumer.consume();
}
public void consume() throws IOException, InterruptedException {
channel.basicConsume(queue, false, "yourConsumerTag", new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//业务处理。
System.out.println("receive: msgId=" + properties.getMessageId());
//消费者需要在有效时间内提交ack,否则消息会重新推送,最多推送16次。
//若推送16次还未成功,则消息被丢弃或者进入死信Exchange。
//专业版实例的有效时间为1分钟,企业版和Serverless实例为5分钟,铂金版实例为30分钟。
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.getConnection().close();
} catch (IOException e) {
System.out.println("close connection error." + e);
}
latch.countDown();
}));
latch.await();
}
}
SDK参数填写说明
参数 | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取实例接入点。 |
Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见生成用户名密码。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见生成用户名密码。 |
virtualHost | amqp_vhost | 云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
exchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
queueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
routingKey | RoutingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue绑定的Routing Key。 需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。 具体操作,请参见步骤二:创建资源。 |
exchangeType | topic | Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange。
请确保填写的Exchange类型和您创建Exchange时选择的类型一致。 |
相关文档
云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容,支持多语言SDK。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK,更多参数说明,请参见开源RabbitMQ客户端文档。
客户端运行时若返回异常报错,您可以参考错误码说明查看异常原因和解决方案。
您可以在云消息队列 RabbitMQ 版控制台通过消息查询或轨迹查询验证消息的收发状态和消息轨迹。具体操作,请参见查询消息和消息轨迹。
- 本页导读 (1)
- 前提条件
- 获取实例接入点
- 安装Java依赖库
- 生成用户名密码
- 创建客户端连接
- 生产消息
- 订阅消息
- SDK参数填写说明
- 相关文档