文档

JMS Pub/Sub(RAM角色跨账号授权场景)

更新时间:
一键部署

本文说明JMS客户端如何在RAM角色跨账号授权场景接入云消息队列 RabbitMQ 版并实现Pub/Sub消息收发。

前提条件

背景信息

  • 一条消息可以被多个订阅者消费。当某个发布者向某个Topic(消息主题)发布某条消息后,订阅了该Topic的所有订阅者都可以消费该条消息。JMS_pub_sub_model

  • 发布者和订阅者之间存在时间依赖性。

    • 如果订阅者创建的是非持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,且在订阅后必须始终保持活跃状态,才能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息不会在订阅者重新处于活跃状态时被消费。示例代码请参见非持久订阅消息

    • 如果订阅者创建的是持久订阅,则订阅者必须在发布者向Topic发布消息前订阅Topic,但在订阅后不必始终保持活跃状态,也能从订阅的Topic消费全部的消息。在订阅者处于非活跃状态时发布的消息会在订阅者重新处于活跃状态时被消费。示例代码请参见持久订阅消息

  • 当您需要通过RAM STS角色授权的方式访问消息队列RabbitMQ版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKey IDAccessKey SecretSecurityToken进行权限认证才能访问。

    重要

    您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

收发消息流程

JMS Pub Sub

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装JMS依赖库

在pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp.jms</groupId>
    <artifactId>mq-amqp-jms-client</artifactId>
    <version>1.11.2-1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.4</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
</dependency>

配置权限认证类(AliyunCredentialsProvider)

  1. 新建一个.properties配置文件,将 AccessKeyIDAccessKeySecretSecurityToken信息写入。参数填写说明,请参见配置参数列表

    # Access Key ID.
    accessKeyId=${accessKeyId}
    # Access Key Secret.
    accessKeySecret=${accessKeySecret}
    # security temp token. (optional)
    securityToken=${securityToken}
  2. 创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置相关参数。具体信息,请参见配置参数列表

    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Properties;
    
    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import org.apache.commons.lang3.StringUtils;
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
    
        /**
         * Access Key ID.
         */
        private static final String accessKeyId;
        /**
         * Access Key Secret.
         */
        private static final String accessKeySecret;
        /**
         * security temp token. (optional)
         */
        private static final String securityToken;
        /**
         * 实例ID,从云消息队列 RabbitMQ 版控制台获取。
         */
        private final String instanceId;
    
        // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
        // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
        // 本示例以将AccessKey保存在配置文件中来实现身份验证为例。
        static {
            // 需要将${ConfigPath}修改为您创建的Properties文件的路径。
            String path = "${ConfigPath}";
            Properties properties = new Properties();
    
            try {
                properties.load(new FileReader(path));
            } catch (IOException e) {
                // 业务中自行处理读配置文件异常。
            }
    
            accessKeyId = properties.getProperty("accessKeyId");
            accessKeySecret = properties.getProperty("accessKeySecret");
            securityToken = properties.getProperty("securityToken");
    
        }
    
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

配置参数列表

参数

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。您可以在云消息队列 RabbitMQ 版控制台实例详情页面获取。

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

AccessKeyID

LTAI5tJQKnB9zVvQ****

阿里云账号或RAM账号的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyIDAccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概述

AccessKeySecret

jw6MawfKOVBveRr84u****

阿里云账号或RAM账号的AccessKey Secret。

SecurityToken

CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************

RAM角色的安全令牌(STS Token)。

instanceId

amqp-cn-v0h1kb9nu***

云消息队列 RabbitMQ 版的实例ID。您可以在云消息队列 RabbitMQ 版控制台实例详情页面查看。如何查看实例ID,请参见查看实例详情

virtualHost

Test

云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情

非持久订阅消息

创建并编译运行Subscriber.java。

重要

编译运行Subscriber.java非持久订阅消息之前,您需要根据代码提示信息配置配置参数列表中所列举的参数。

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Subscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";

    public static RMQConnectionFactory getRMQConnectionFactory() {
        final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic destTopic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createSubscriber(destTopic);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

持久订阅消息

创建并编译运行DurableSubscriber.java。

重要

编译运行DurableSubscriber.java持久订阅消息之前,您需要根据代码提示信息配置配置参数列表中所列举的参数。

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class DurableSubscriber {
    public static String DESTINATION = "systemA.systemB.Price.*";
    public static String CLIENT_ID = "client_id";
    public static String SUBSCRIBER_NAME = "subscriber_name";

    public static RMQConnectionFactory getRMQConnectionFactory() {
    final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }


    public static void main(String[] args) throws Exception {
        RMQConnectionFactory connectionFactory = getRMQConnectionFactory();
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(CLIENT_ID);
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic Topic = session.createTopic(DESTINATION);
        TopicSubscriber subscriber = session.createDurableSubscriber(Topic,SUBSCRIBER_NAME);
        connection.start();
        while (true) {
            Message msg = subscriber.receive(1000);
            if (msg == null) {
                System.out.println("No new message, sleeping 5 secs");
                Thread.sleep(5 * 1000);
                continue;
            }
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                System.out.println(body);
            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

生产消息

创建并编译运行Publisher.java。

重要

编译运行Publisher.java生产消息之前,您需要根据代码提示信息配置配置参数列表中所列举的参数。

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
import javax.jms.*;

public class Publisher {

    public static String DESTINATION = "systemA.systemB.Price.aaa";

    public static RMQConnectionFactory getRMQConnectionFactory() {
               final AliyunCredentialsProvider provider = new AliyunCredentialsProvider("${InstanceID}");
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setHost("xxx.xxx.aliyuncs.com");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("${VhostName}");
        connectionFactory.setAmqpConnectionFactoryPostProcessor(new AmqpConnectionFactoryPostProcessor() {
            @Override
            public void postProcess(com.rabbitmq.client.ConnectionFactory cf) {
                cf.setCredentialsProvider(provider);
                cf.setAutomaticRecoveryEnabled(true);
                cf.setNetworkRecoveryInterval(5000);
                System.out.println(cf.getPassword());
            }
        });
        return connectionFactory;
    }

    public static void main(String[] args) throws JMSException {
        RMQConnectionFactory factory = getRMQConnectionFactory();
        TopicConnection connection = factory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        TextMessage msg = session.createTextMessage("hello topic test");
        Topic topic = session.createTopic(DESTINATION);
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.send(msg);
        System.out.println("消息已发送!");
        session.close();
        connection.close();
    }
}
  • 本页导读 (1)
文档反馈