本文说明JMS客户端如何在主账号与RAM账号场景接入云消息队列 RabbitMQ 版并实现Pub/Sub消息收发。
前提条件
背景信息
- 一条消息可以被多个订阅者消费。当某个发布者向某个Topic(消息主题)发布某条消息后,订阅了该Topic的所有订阅者都可以消费该条消息。
- 发布者和订阅者之间存在时间依赖性。
- 开源RabbitMQ客户端接入云上服务时,需要先通过 AccessKey ID和AccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的userName和passWord参数中。云消息队列 RabbitMQ 版会通过用户名和密码进行权限认证。重要 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection。
收发消息流程

获取接入点
您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
- 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
- 在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
- 在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的
图标,复制该接入点。
类型 说明 示例值 公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装JMS依赖库
在pom.xml中添加以下依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp.jms</groupId>
<artifactId>mq-amqp-jms-client</artifactId>
<version>1.11.2-1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
生成用户名密码
具体操作,请参见静态用户名密码管理。
非持久订阅消息
创建并编译运行Subscriber.java。
重要 编译运行Subscriber.java订阅消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
参数 | 示例值 | 描述 |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。您可以在云消息队列 RabbitMQ 版控制台的实例详情页面获取。 |
Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 |
userName | MjoxODgwNzcwODY5MD**** | 在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码页面获取。 |
passWord | NDAxREVDQzI2MjA0OT**** | 在云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台的静态用户名密码页面,根据实例ID搜索已创建的用户名以及对应密码。 |
virtualHost | Test | 云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情。 |
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class Subscriber {
public static String DESTINATION = "systemA.systemB.Price.*";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("VhostName");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws Exception {
RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
TopicConnection connection = connectionFactory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destTopic = session.createTopic(DESTINATION);
TopicSubscriber subscriber = session.createSubscriber(destTopic);
connection.start();
while (true) {
Message msg = subscriber.receive(1000);
if (msg == null) {
System.out.println("No new message, sleeping 5 secs");
Thread.sleep(5 * 1000);
continue;
}
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
System.out.println(body);
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
持久订阅消息
创建并编译运行DurableSubscriber.java。
重要 编译运行DurableSubscriber.java订阅消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class DurableSubscriber {
public static String DESTINATION = "systemA.systemB.Price.*";
public static String CLIENT_ID = "client_id";
public static String SUBSCRIBER_NAME = "subscriber_name";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws Exception {
RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(CLIENT_ID);
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic Topic = session.createTopic(DESTINATION);
TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
connection.start();
while (true) {
Message msg = subscriber.receive(1000);
if (msg == null) {
System.out.println("No new message, sleeping 5 secs");
Thread.sleep(5 * 1000);
continue;
}
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
System.out.println(body);
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
发布消息
创建并编译运行Publisher.java。
重要 编译运行Publisher.java发布消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;
public class Publisher {
public static String DESTINATION = "systemA.systemB.Price.aaa";
public static RMQConnectionFactory getRMQConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setHost("xxx.xxx.aliyuncs.com");
connectionFactory.setUsername("${Username}");
connectionFactory.setPassword("${Password}");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("${VhostName}");
connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
@Override
public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
cf.setAutomaticRecoveryEnabled(true);
cf.setNetworkRecoveryInterval(5000);
System.out.println(cf.getPassword());
}
});
return connectionFactory;
}
public static void main(String[] args) throws JMSException {
RMQConnectionFactory factory = getRMQConnectionFactory();
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
TextMessage msg = session.createTextMessage("hello topic test1");
Topic topic = session.createTopic(DESTINATION);
TopicPublisher publisher = session.createPublisher(topic);
publisher.send(msg);
System.out.println("消息已发布!");
session.close();
connection.close();
}
}