跨云账号授权场景
本文以调用Java SDK为例,介绍在RAM角色跨账号授权场景,通过开源SDK实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程,其他语言或框架的SDK消息收发过程相似。
前提条件
背景信息
当您需要通过RAM STS角色授权的方式访问云消息队列 RabbitMQ 版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKeyID、AccessKeySecret与SecurityToken进行权限认证才能访问。
借助访问控制RAM的RAM角色,您可以跨云账号授权,使某个企业访问另一个企业的云消息队列 RabbitMQ 版。
企业A希望能专注于业务系统,仅作为云消息队列 RabbitMQ 版所有者。企业A希望可以授权企业B来操作部分业务,例如:云消息队列 RabbitMQ 版的运维、监控以及管理等。
企业A希望当企业B的员工加入或离职时,无需做任何权限变更。企业B可以进一步将企业A的资源访问权限分配给企业B的RAM用户(员工或应用),并可以精细控制其员工或应用对资源的访问和操作权限。
企业A希望如果双方合同终止,企业A随时可以撤销企业B的授权。
更多信息,请参见RAM跨云账号授权。
收发消息流程(以Java语言为例)
云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK。
安装Java依赖库
在pom.xml中添加以下依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-sts20150401</artifactId>
<version>1.0.4</version>
</dependency>
配置权限认证类(AliyunCredentialsProvider)
创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置相关参数。具体信息,请参见参数列表。
import com.alibaba.mq.amqp.utils.UserUtils; import com.aliyun.auth.credentials.Credential; import com.aliyun.auth.credentials.provider.StaticCredentialProvider; import com.aliyun.sdk.service.sts20150401.AsyncClient; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse; import com.rabbitmq.client.impl.CredentialsProvider; import darabonba.core.client.ClientOverrideConfiguration; import org.apache.commons.lang3.StringUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class AliyunCredentialsProvider implements CredentialsProvider { /** * 默认过期时间,单位毫秒。可以根据业务实际情况设置。 */ private final long STS_TIMEOUT_DEFAULT = 1800 * 1000; /** * 实例ID,从云消息队列 RabbitMQ 版控制台获取。 */ private final String instanceId; /** * Access Key ID。 */ private String accessKeyId; /** * Access Key Secret。 */ private String accessKeySecret; /** * (可选)security temp token。 */ private String securityToken; /** * STS过期时间, 记录后可提前更新STS token。 */ private Long timeStampLimit; // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 public AliyunCredentialsProvider(final String instanceId) { this.instanceId = instanceId; } public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException { this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT; // 自行调用AssumeRole接口实现,进行身份信息获取。 StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder() .accessKeyId(alibabaAccessKeyId) .accessKeySecret(alibabaAccessKeySecret) .build()); AsyncClient client = AsyncClient.builder() .region(region) // 请设置Region ID, 例如cn-hangzhou。 .credentialsProvider(provider) .overrideConfiguration( ClientOverrideConfiguration.create() // Endpoint请参考https://api.aliyun.com/product/Sts。 .setEndpointOverride("sts." + region + ".aliyuncs.com") //.setConnectTimeout(Duration.ofSeconds(30)) ) .build(); AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleARN) // 从控制台获取得到的角色ARN。 .roleSessionName("testRoleName") // 当前角色Session的名称,可自定义。 .durationSeconds(STS_TIMEOUT_DEFAULT / 1000) .build(); CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest); // Synchronously get the return value of the API request AssumeRoleResponse resp = response.get(); if (resp.getBody().getCredentials() != null) { System.out.println("[INFO] Update AK, SK, Token successfully."); this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId(); this.securityToken = resp.getBody().getCredentials().getSecurityToken(); this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret(); } client.close(); } // 检测当前该token是否快要过期。 public boolean isNearlyExpired() { // 提前30秒判断。 return System.currentTimeMillis() > timeStampLimit - 30 * 1000L; } @Override public String getUsername() { if(StringUtils.isNotEmpty(securityToken)) { return UserUtils.getUserName(accessKeyId, instanceId, securityToken); } else { return UserUtils.getUserName(accessKeyId, instanceId); } } @Override public String getPassword() { try { return UserUtils.getPassord(accessKeySecret); } catch (InvalidKeyException e) { //todo } catch (NoSuchAlgorithmException e) { //todo } return null; } }
表 1. 参数列表
参数 | 示例值 | 描述 |
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。 |
Port | 5672 | 默认端口。非加密端口5672,加密端口5671。 |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | 阿里云账号或RAM用户的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概览。 |
AccessKeySecret | jw6MawfKOVBveRr84u**** | 阿里云账号或RAM用户的AccessKey Secret。 |
region | cn-hangzhou | 调用对应地域的AssumeRole接口,详情请参见AssumeRole。 |
roleARN | acs:ram::125xxxxxxx223:role/xxx | RAM角色的ARN。格式为 |
instanceId | amqp-cn-v0h1kb9nu*** | 云消息队列 RabbitMQ 版的实例ID。您可以在云消息队列 RabbitMQ 版控制台的实例详情页面查看。如何查看实例ID,请参见查看实例详情。 |
virtualHost | Test | 云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情。 |
ExchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面获取。 |
RoutingKey | RoutingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue的Routing Key。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Routing Key。 |
QueueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。 |
生产消息
创建并编译运行ProducerTest.java。
编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
// 推荐将AK/SK/ARN等信息在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
// 阿里云账号的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里云账号的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里云服务所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里云角色ARN,从控制台获取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//设置实例的接入点。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//设置实例的Vhost。
private static final String virtualHost = "${VirtualHost}";
//设置Exchange、Queue和绑定关系。
private static final String exchangeName = "${ExchangeName}";
private static final String queueName = "${QueueName}";
private static final String routingKey = "${RoutingKey}";
//设置Exchange类型。
private static final String exchangeType = "${ExchangeType}";
public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setHost(hostName);
// ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
// ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setCredentialsProvider(aliyunCredentialsProvider);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
factory.setVirtualHost(virtualHost);
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
// 开始发送消息,3600条消息,每条发送后暂停1秒,将持续1小时。
for (int i = 0; i < 3600; i++) {
try {
if (aliyunCredentialsProvider.isNearlyExpired()) {
// 认证可能过期,重新认证
System.out.println("[WARN] Token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
// 当配置更新后,需要重新建立连接。
connection = factory.newConnection();
channel = connection.createChannel();
}
// ${ExchangeName}必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
// ${RoutingKey}根据业务需求填入相应的RoutingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("消息发送Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("[ERROR] Send fail, error: " + e.getMessage());
Thread.sleep(5000L);
}
}
connection.close();
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
// 推荐将AK/SK在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多信息,请参见实例限流最佳实践。
订阅消息
创建并编译运行ConsumerTest.java订阅消息。
编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
// 推荐将AK/SK/ARN等信息在环境变量中配置,若将其明文保存在工程代码中,将带来不必要的数据泄露风险。
// 阿里云账号的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里云账号的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里云服务所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里云角色ARN,从控制台获取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//设置实例的接入点。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//设置实例的Vhost。
private static final String virtualHost = "${VirtualHost}";
//设置Queue。
private static final String queueName = "${QueueName}";
public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setHost(hostName);
// ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
factory.setVirtualHost(virtualHost);
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建${QueueName} ,该Queue必须在云消息队列 RabbitMQ 版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
consume(channel, queueName);
System.out.println("Consumer started.");
// 循环检测sts是否即将过期,若过期则更新connection,重新消费。
// 这里为了方便理解,使用while循环检测认证是否接近过期。
// 可以使用定时任务,更优雅地实现定时检查、更新操作。
while (true) {
// 每次处理完消息后,可以判断是否接近过期。
// 如果接近过期,则更新一次认证类,
// 该过程需要重新创建连接,以确保业务持续运行。
if (aliyunCredentialsProvider.isNearlyExpired()) {
System.out.println("token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
connection.close();
connection = factory.newConnection();
channel = connection.createChannel();
// 重新开始消费消息。
consume(channel, queueName);
System.out.println("Consumer started.");
} else {
// 每秒检测一次。
Thread.sleep(1000);
}
}
}
public static void consume(Channel channel, String queueName) throws IOException {
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("Exception, cause:" + e.getMessage());
}
}
});
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
查询消息
如果您想确认消息是否成功发送至云消息队列 RabbitMQ 版,可以在云消息队列 RabbitMQ 版控制台查询消息。具体操作,请参见查询消息。