本文说明JMS客户端如何在主账号与RAM账号场景接入云消息队列 RabbitMQ 版并实现Pub/Sub消息收发。

前提条件

背景信息

  • 一条消息可以被多个订阅者消费。当某个发布者向某个Topic(消息主题)发布某条消息后,订阅了该Topic的所有订阅者都可以消费该条消息。JMS_pub_sub_model
  • 发布者和订阅者之间存在时间依赖性。
    • 如果订阅者创建的是非持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,且在订阅后必须始终保持活跃状态,才能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息不会在订阅者重新处于活跃状态时被消费。示例代码请参见非持久订阅消息
    • 如果订阅者创建的是持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,但在订阅后不必始终保持活跃状态,也能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息会在订阅者重新处于活跃状态时被消费。示例代码请参见持久订阅消息
  • 开源RabbitMQ客户端接入云上服务时,需要先通过 AccessKey IDAccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的userNamepassWord参数中。云消息队列 RabbitMQ 版会通过用户名和密码进行权限认证。
    重要 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

收发消息流程

消息收发

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表
  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。
    类型说明示例值
    公网接入点公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点VPC环境可读写。按量付费实例和预付费实例默认都支持。XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装JMS依赖库

pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

生成用户名密码

具体操作,请参见静态用户名密码管理

非持久订阅消息

创建并编译运行Subscriber.java

重要 编译运行Subscriber.java订阅消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
表 1. 配置参数列表
参数示例值描述
hostName1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com云消息队列 RabbitMQ 版实例接入点。您可以在云消息队列 RabbitMQ 版控制台实例详情页面获取。
Port5672默认端口。非加密端口为5672,加密端口为5671。
userNameMjoxODgwNzcwODY5MD****云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面获取。
passWordNDAxREVDQzI2MjA0OT****云消息队列 RabbitMQ 版控制台将阿里云账号或RAM账号的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面,根据实例ID搜索已创建的用户名以及对应密码。
virtualHostTest云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("VhostName");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

持久订阅消息

创建并编译运行DurableSubscriber.java

重要 编译运行DurableSubscriber.java订阅消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

发布消息

创建并编译运行Publisher.java

重要 编译运行Publisher.java发布消息之前,您需要根据代码提示信息修改配置参数列表中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test1");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("消息已发布!");
        session.close();
        connection.close();
    }
}