本文说明JMS客户端如何在主账号与RAM账号场景接入消息队列RabbitMQ版并实现Pub/Sub消息收发。

背景信息

  • 一条消息可以被多个订阅者消费。当某个发布者向某个Topic(消息主题)发布某条消息后,订阅了该Topic的所有订阅者都可以消费该条消息。JMS_pub_sub_model
  • 发布者和订阅者之间存在时间依赖性。
    • 如果订阅者创建的是非持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,且在订阅后必须始终保持活跃状态,才能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息不会在订阅者重新处于活跃状态时被消费。示例代码请参见非持久订阅消息
    • 如果订阅者创建的是持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,但在订阅后不必始终保持活跃状态,也能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息会在订阅者重新处于活跃状态时被消费。示例代码请参见持久订阅消息
  • 开源RabbitMQ客户端接入云上服务时,需要先通过AccessKey和AccessKey Secret生成用户名和密码,将用户名和密码设置到开源客户端SDK的userNamepassWord参数中。消息队列RabbitMQ版会通过用户名和密码进行权限认证。
    注意 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

收发消息流程

消息收发

获取接入点

您需要在消息队列RabbitMQ版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入消息队列RabbitMQ版实例。

  1. 登录消息队列RabbitMQ版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例详情
  4. 实例详情页面,选择实例,在基本信息区域,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。
    类型 说明 示例值
    公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装JMS依赖库

pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

生成用户名密码

具体操作,请参见创建静态用户名密码

非持久订阅消息

创建并编译运行Subscriber.java

注意 编译运行Subscriber.java订阅消息之前,您需要根据代码提示信息配置表 1中所列举的参数。
表 1. 配置参数列表
参数 示例值 描述
hostName 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com 消息队列RabbitMQ版实例接入点。您可以在消息队列RabbitMQ版控制台实例详情页面获取。
Port 5672 默认端口。非加密端口为5672,加密端口为5671。
userName MjoxODgwNzcwODY5MD**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和消息队列RabbitMQ版实例ID通过Base64编码后生成的静态用户名。您可以在消息队列RabbitMQ版控制台用户名密码管理页面,根据实例ID搜索已创建的用户名。
passWord NDAxREVDQzI2MjA0OT**** 消息队列RabbitMQ版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在消息队列RabbitMQ版控制台用户名密码管理页面,根据实例ID搜索已创建的用户名以及对应密码。
virtualHost Test 消息队列RabbitMQ版实例的Vhost。您可以在消息队列RabbitMQ版控制台Vhost详情页面查看。如何查看Vhost,请参见查看Vhost详情
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("VhostName");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

持久订阅消息

创建并编译运行DurableSubscriber.java

注意 编译运行DurableSubscriber.java订阅消息之前,您需要根据代码提示信息配置表 1中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

发布消息

创建并编译运行Publisher.java

注意 编译运行Publisher.java发布消息之前,您需要根据代码提示信息配置表 1中所列举的参数。
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test1");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("消息已发布!");
        session.close();
        connection.close();
    }
}