本文以调用Java SDK为例,介绍在RAM角色跨账号授权场景,通过开源SDK实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程,其他语言或框架的SDK消息收发过程相似。

前提条件

背景信息

当您需要通过RAM STS角色授权的方式访问云消息队列 RabbitMQ 版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKeyIDAccessKeySecretSecurityToken进行权限认证才能访问。

收发消息流程(以Java语言为例)

开源SDK消息收发流程
说明 云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表
  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。
    类型说明示例值
    公网接入点公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点VPC环境可读写。按量付费实例和预付费实例默认都支持。XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>

配置权限认证类(AliyunCredentialsProvider)

  1. 新建一个.properties配置文件,将 AccessKeyIDAccessKeySecretSecurityToken信息写入。参数填写说明,请参见参数列表
    # Access Key ID.
    accessKeyId=${accessKeyId}
    # Access Key Secret.
    accessKeySecret=${accessKeySecret}
    # security temp token. (optional)
    securityToken=${securityToken}
  2. 创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置相关参数。具体信息,请参见参数列表
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Properties;
    
    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import org.apache.commons.lang3.StringUtils;
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
    
        /**
         * Access Key ID.
         */
        private static final String accessKeyId;
        /**
         * Access Key Secret.
         */
        private static final String accessKeySecret;
        /**
         * security temp token. (optional)
         */
        private static final String securityToken;
        /**
         * 实例ID,从云消息队列 RabbitMQ 版控制台获取。
         */
        private final String instanceId;
    
    
        // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
        // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
        // 本示例以将AccessKey保存在配置文件中来实现身份验证为例。
        static {
            // 需要将${ConfigPath}修改为您创建的Properties文件的路径。
            String path = "${ConfigPath}";
            Properties properties = new Properties();
    
            try {
                properties.load(new FileReader(path));
            } catch (IOException e) {
                // 业务中自行处理读配置文件异常。
            }
    
            accessKeyId = properties.getProperty("accessKeyId");
            accessKeySecret = properties.getProperty("accessKeySecret");
            securityToken = properties.getProperty("securityToken");
    
        }
    
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }
表 1. 参数列表
参数示例值描述
hostName1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取接入点
Port5672默认端口。非加密端口5672,加密端口5671。
AccessKeyIDLTAI5tJQKnB9zVvQ****阿里云账号或RAM用户的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyIDAccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概览
AccessKeySecretjw6MawfKOVBveRr84u****阿里云账号或RAM用户的AccessKey Secret。
SecurityTokenCAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************RAM角色的安全令牌(STS Token)。
instanceIdamqp-cn-v0h1kb9nu***云消息队列 RabbitMQ 版的实例ID。您可以在云消息队列 RabbitMQ 版控制台实例详情页面查看。如何查看实例ID,请参见查看实例详情
virtualHostTest云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情
ExchangeNameExchangeTest云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面获取。
BindingKeyBindingKeyTest云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。
QueueNameQueueTest云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。

生产消息

创建并编译运行ProducerTest.java

重要 编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // ${BindingKey}根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

订阅消息

创建并编译运行ConsumerTest.java订阅消息。

重要 编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在云消息队列 RabbitMQ 版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

查询消息

如果您想确认消息是否成功发送至云消息队列 RabbitMQ 版,可以在云消息队列 RabbitMQ 版控制台查询消息。具体操作,请参见查询消息