本文以调用Java SDK为例,介绍在RAM角色跨账号授权场景,通过开源SDK实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程,其他语言或框架的SDK消息收发过程相似。
前提条件
背景信息
当您需要通过RAM STS角色授权的方式访问云消息队列 RabbitMQ 版服务时,需要通过阿里云提供的权限认证类(AliyunCredentialsProvider)设置 AccessKeyID、AccessKeySecret与SecurityToken进行权限认证才能访问。
收发消息流程(以Java语言为例)

说明 云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK。
获取接入点
您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
- 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
- 在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
- 在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的
图标,复制该接入点。
类型 说明 示例值 公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装Java依赖库
在pom.xml中添加以下依赖。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
配置权限认证类(AliyunCredentialsProvider)
- 新建一个.properties配置文件,将 AccessKeyID、AccessKeySecret和SecurityToken信息写入。参数填写说明,请参见参数列表。
# Access Key ID. accessKeyId=${accessKeyId} # Access Key Secret. accessKeySecret=${accessKeySecret} # security temp token. (optional) securityToken=${securityToken}
- 创建权限认证类AliyunCredentialsProvider.java,根据代码提示信息,设置相关参数。具体信息,请参见参数列表。
import java.io.FileReader; import java.io.IOException; import java.util.Properties; import com.alibaba.mq.amqp.utils.UserUtils; import com.rabbitmq.client.impl.CredentialsProvider; import org.apache.commons.lang3.StringUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; public class AliyunCredentialsProvider implements CredentialsProvider { /** * Access Key ID. */ private static final String accessKeyId; /** * Access Key Secret. */ private static final String accessKeySecret; /** * security temp token. (optional) */ private static final String securityToken; /** * 实例ID,从云消息队列 RabbitMQ 版控制台获取。 */ private final String instanceId; // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 // 本示例以将AccessKey保存在配置文件中来实现身份验证为例。 static { // 需要将${ConfigPath}修改为您创建的Properties文件的路径。 String path = "${ConfigPath}"; Properties properties = new Properties(); try { properties.load(new FileReader(path)); } catch (IOException e) { // 业务中自行处理读配置文件异常。 } accessKeyId = properties.getProperty("accessKeyId"); accessKeySecret = properties.getProperty("accessKeySecret"); securityToken = properties.getProperty("securityToken"); } public AliyunCredentialsProvider(final String instanceId) { this.instanceId = instanceId; } @Override public String getUsername() { if(StringUtils.isNotEmpty(securityToken)) { return UserUtils.getUserName(accessKeyId, instanceId, securityToken); } else { return UserUtils.getUserName(accessKeyId, instanceId); } } @Override public String getPassword() { try { return UserUtils.getPassord(accessKeySecret); } catch (InvalidKeyException e) { //todo } catch (NoSuchAlgorithmException e) { //todo } return null; } }
参数 | 示例值 | 描述 |
---|---|---|
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取接入点。 |
Port | 5672 | 默认端口。非加密端口5672,加密端口5671。 |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | 阿里云账号或RAM用户的AccessKey ID。您可以登录RAM访问控制台,创建RAM角色,并赋予角色AliyunAMQPFullAccess权限,获取角色的ARN,调用AssumeRole接口获取一个扮演该角色的临时身份。AssumeRole执行成功会返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,请参见RAM角色概览。 |
AccessKeySecret | jw6MawfKOVBveRr84u**** | 阿里云账号或RAM用户的AccessKey Secret。 |
SecurityToken | CAIS9wF1q6Ft5B2yfSjIr5fkJNPkvK5tgqeScX+ElnkMXvhvgIPO0Dz2IHhNfXZuB************ | RAM角色的安全令牌(STS Token)。 |
instanceId | amqp-cn-v0h1kb9nu*** | 云消息队列 RabbitMQ 版的实例ID。您可以在云消息队列 RabbitMQ 版控制台的实例详情页面查看。如何查看实例ID,请参见查看实例详情。 |
virtualHost | Test | 云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台的Vhost 列表页面查看。如何查看Vhost,请参见查看Vhost连接详情。 |
ExchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面获取。 |
BindingKey | BindingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。 |
QueueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台的Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。 |
生产消息
创建并编译运行ProducerTest.java。
重要 编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
// 开始发送消息。
for (int i = 0; i < 100; i++ ) {
// ${ExchangeName}必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
// ${BindingKey}根据业务需求填入相应的BindingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
}
}
订阅消息
创建并编译运行ConsumerTest.java订阅消息。
重要 编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台概览页面查看。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 创建${ExchangeName},该Exchange必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
// 创建${QueueName} ,该Queue必须在云消息队列 RabbitMQ 版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
// Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
// 开始消费消息。
channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
查询消息
如果您想确认消息是否成功发送至云消息队列 RabbitMQ 版,可以在云消息队列 RabbitMQ 版控制台查询消息。具体操作,请参见查询消息。