本文说明JMS客户端如何接入消息队列AMQP版并实现Pub/Sub消息收发。

背景信息

  • JMS Pub/Sub
    JMS Pub/Sub(发布/订阅)消息收发模式具有以下特点:
    • 一条消息可以被多个订阅者消费。当某个发布者向某个Topic(消息主题)发布某条消息后,订阅了该Topic的所有订阅者都可以消费该条消息。JMS_pub_sub_model
    • 发布者和订阅者之间存在时间依赖性。
      • 如果订阅者创建的是非持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,且在订阅后必须始终保持活跃状态,才能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息不会在订阅者重新处于活跃状态时被消费。示例代码请参见非持久订阅消息
      • 如果订阅者创建的是持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,但在订阅后不必始终保持活跃状态,也能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息会在订阅者重新处于活跃状态时被消费。示例代码请参见持久订阅消息
  • 用户名密码
    客户端接入消息队列AMQP版时,消息队列AMQP版会通过用户名密码进行权限认证。消息队列AMQP版支持以下方式为客户端生成用户名密码:
    • 动态用户名密码:通过阿里云提供的权限认证类生成动态用户名密码。
    • 静态用户名密码:通过消息队列AMQP版控制台生成静态用户名密码。该方式和开源RabbitMQ保持一致。
    注意 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。详情请参见Connection

收发消息流程

JMS Pub Sub

获取接入点

您需要在消息队列AMQP版控制台获取实例的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入消息队列AMQP版实例。

  1. 登录消息队列AMQP版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例详情
  4. 实例详情页面,选择实例,在基本信息区域,将鼠标指针移动到目标类型的接入点,然后单击该接入点以复制该接入点。
    类型 说明 示例值
    公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装JMS依赖库

pom.xml中添加以下依赖。
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

生成用户名密码

创建用于生成动态用户名密码的AliyunCredentialsProvider.java。

import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * 阿里云UserName、Password生成类(动态变化)。
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * 实例ID(从消息队列AMQP版控制台获取)。
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}

在消息队列AMQP版控制台生成静态用户名密码。

详情请参见创建用户名密码

非持久订阅消息

创建并编译运行Subscriber.java
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${AccessKeySecret}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("VhostName");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

持久订阅消息

创建并编译运行DurableSubscriber.java
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
	final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${SecretKeyID}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

发布消息

创建并编译运行Publisher.java
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
               final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${AccessKeyID}", "${SecretKeyID}","${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("消息已发送!");
        session.close();
        connection.close();
    }
}
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setUsername("${Username}");
        connectionFactory.setPassword("${Password}");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test1");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("消息已发送!");
        session.close();
        connection.close();
    }
}