跨云账号授权场景

更新时间:

本文以调用Java SDK为例,介绍在RAM角色跨账号授权场景,通过开源SDK实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程,其他语言或框架的SDK消息收发过程相似。

前提条件

背景信息

当您需要通过RAM STS角色授权的方式访问云消息队列 RabbitMQ 版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKeyIDAccessKeySecretSecurityToken进行权限认证才能访问。

借助访问控制RAM的RAM角色,您可以跨云账号授权,使某个企业访问另一个企业的云消息队列 RabbitMQ 版

  • 企业A希望能专注于业务系统,仅作为云消息队列 RabbitMQ 版所有者。企业A希望可以授权企业B来操作部分业务,例如:云消息队列 RabbitMQ 版的运维、监控以及管理等。

  • 企业A希望当企业B的员工加入或离职时,无需做任何权限变更。企业B可以进一步将企业A的资源访问权限分配给企业B的RAM用户(员工或应用),并可以精细控制其员工或应用对资源的访问和操作权限。

  • 企业A希望如果双方合同终止,企业A随时可以撤销企业B的授权。

更多信息,请参见RAM跨云账号授权

收发消息流程(以Java语言为例)

开源SDK消息收发流程

说明

云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK

安装Java依赖库

pom.xml中添加以下依赖。

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- 支持开源所有版本 -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

配置权限认证类(AliyunCredentialsProvider)

  1. 创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置相关参数。具体信息,请参见参数列表

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * 默认过期时间,单位毫秒。可以根据业务实际情况设置。
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * 实例ID,从云消息队列 RabbitMQ 版控制台获取。
         */
        private final String instanceId;
        /**
         * Access Key ID。
         */
        private String accessKeyId;
        /**
         * Access Key Secret。
         */
        private String accessKeySecret;
        /**
         *  (可选)security temp token。
         */
        private String securityToken;
        /**
         * STS过期时间, 记录后可提前更新STS token。
         */
        private Long timeStampLimit;
    
        // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
        // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // 自行调用AssumeRole接口实现,进行身份信息获取。
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // 请设置Region ID, 例如cn-hangzhou。
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // Endpoint请参考https://api.aliyun.com/product/Sts。
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // 从控制台获取得到的角色ARN。
                    .roleSessionName("testRoleName") // 当前角色Session的名称,可自定义。
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // 检测当前该token是否快要过期。
        public boolean isNearlyExpired() {
            // 提前30秒判断。
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

表 1. 参数列表

参数

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。

Port

5672

默认端口。非加密端口5672,加密端口5671。

AccessKeyID

LTAI5tJQKnB9zVvQ****

阿里云账号或RAM用户的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyIDAccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概览

AccessKeySecret

jw6MawfKOVBveRr84u****

阿里云账号或RAM用户的AccessKey Secret。

region

cn-hangzhou

调用对应地域的AssumeRole接口,详情请参见AssumeRole

roleARN

acs:ram::125xxxxxxx223:role/xxx

RAM角色的ARN。格式为acs:ram::<account_id>:role/<role_name>。详情请参见AssumeRole

instanceId

amqp-cn-v0h1kb9nu***

云消息队列 RabbitMQ 版的实例ID。您可以在云消息队列 RabbitMQ 版控制台实例详情页面查看。如何查看实例ID,请参见查看实例详情

virtualHost

Test

云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情

ExchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面获取。

RoutingKey

RoutingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue的Routing Key。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面查看Exchange的绑定关系,获取Routing Key。

QueueName

QueueTest

云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。

生产消息

创建并编译运行ProducerTest.java

重要

编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // 推荐将AK/SK/ARN等信息在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
    // 阿里云账号的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云账号的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服务所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,从控制台获取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //设置实例的接入点。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //设置实例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //设置Exchange、Queue和绑定关系。
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String routingKey = "${RoutingKey}";
    //设置Exchange类型。
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost(hostName);
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // 开始发送消息,3600条消息,每条发送后暂停1秒,将持续1小时。
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // 认证可能过期,重新认证
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // 当配置更新后,需要重新建立连接。
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // ${ExchangeName}必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
                // ${RoutingKey}根据业务需求填入相应的RoutingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("消息发送Body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // 推荐将AK/SK在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}
说明

云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多信息,请参见实例限流最佳实践

订阅消息

创建并编译运行ConsumerTest.java订阅消息。

重要

编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // 推荐将AK/SK/ARN等信息在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
    // 阿里云账号的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云账号的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服务所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,从控制台获取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //设置实例的接入点。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //设置实例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //设置Queue。
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost(hostName);
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建${QueueName} ,该Queue必须在云消息队列 RabbitMQ 版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // 循环检测sts是否即将过期,若过期则更新connection,重新消费。
        // 这里为了方便理解,使用while循环检测认证是否接近过期。
        // 可以使用定时任务,更优雅地实现定时检查、更新操作。
        while (true) {
            // 每次处理完消息后,可以判断是否接近过期。
            // 如果接近过期,则更新一次认证类,
            // 该过程需要重新创建连接,以确保业务持续运行。
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 重新开始消费消息。
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // 每秒检测一次。
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    //接收到的消息,进行业务逻辑处理。
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

查询消息

如果您想确认消息是否成功发送至云消息队列 RabbitMQ 版,可以在云消息队列 RabbitMQ 版控制台查询消息。具体操作,请参见查询消息