使用Java SDK收发消息
本文为您介绍快速使用云消息队列 RabbitMQ 版进行消息收发的操作流程。
操作流程
步骤一:(可选)RAM用户授权
RAM用户默认没有云消息队列 RabbitMQ 版资源的操作权限。如果您的账号为RAM用户,必须先为RAM用户进行授权,若您的账号为阿里云账号,则默认拥有云消息队列 RabbitMQ 版服务的所有权限,无需进行授权操作。
如果您需要为RAM用户授权,具体操作,请参见为RAM用户授权。
步骤二:创建资源
创建实例
实例是一个独立的云消息队列 RabbitMQ 版资源实体,包含Vhost、Exchange、Queue等基本的资源要素。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在顶部菜单栏选择地域,然后在实例列表页面,单击创建实例。
在购买页完成基本配置并勾选服务协议,然后单击立即购买。
根据提示完成支付。
在实例列表页面的顶部菜单栏,选择地域,您可以看到创建的实例。
说明专业版实例和企业版实例购买后,立即进入服务中状态。
铂金版实例购买后,首先进入部署中状态,待集群分配后,再进入服务中状态。
获取实例接入点
在收发消息时,您需要为发布端和订阅端配置该接入点,客户端通过接入点接入云消息队列 RabbitMQ 版实例。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的图标,复制该接入点。
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.net.mq.amqp.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.vpc.mq.amqp.aliyuncs.com
创建Vhost
Vhost是指虚拟主机,用作逻辑隔离,分别管理各自的Exchange、Queue和Binding,使得应用安全地运行在不同的Vhost上,相互之间不会干扰。一个实例下可以有多个Vhost,一个Vhost里面可以有若干个Exchange和Queue。Producer和Consumer连接云消息队列 RabbitMQ 版需要指定一个Vhost。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击Vhost 列表。
在Vhost 列表页面,单击创建 Vhost。
在创建 Vhost面板的Vhost名称文本框,输入Vhost名称,然后单击确定。
创建Exchange
Producer将消息发送到Exchange,Exchange根据Routing Key将消息路由到一个或多个Queue中(或者丢弃)。不同类型的Exchange的路由规则不同。更多信息,请参见Exchange。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击Exchange 列表。
在Exchange 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,然后单击创建 Exchange。
在创建 Exchange面板,输入Exchange名称,选择Exchange类型,设置是否为Internal类型,然后单击确定。
参数
描述
Exchange 名称
Exchange名称。以amq.开头的为保留字段,因此不能使用。例如:amq.test。
类型
Exchange类型。取值:
direct:该类型的路由规则会将消息路由到Routing Key完全匹配的Queue中。
topic:该类型与direct类型相似。Topic Exchange路由规则没有Direct Exchange那么严格,支持模糊匹配和多条件匹配,即该类型Exchange使用Routing Key模式匹配和字符串比较的方式将消息路由至绑定的Queue中。
fanout:该类型的路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能。
headers:该类型与direct类型相似。Headers Exchange使用Headers属性代替Routing Key进行路由匹配,在绑定Headers Exchange和Queue时,设置绑定属性的键值对;在向Headers Exchange发送消息时,设置消息的Headers属性键值对,使用消息Headers属性键值对和绑定属性键值对比较的方式将消息路由至绑定的Queue。
x-delayed-message:通过声明该类Exchange,您可以自定义消息的Header属性x-delay来指定消息延时投递的时间段,单位为毫秒。消息将在x-delay中定义的时间段后,根据路由规则被投递到对应的Queue。路由规则取决于x-delayed-type中指定的Exchange路由类型。
x-consistent-hash:x-consistent-hash Exchange支持将Routing Key或Header值进行Hash计算,使用一致性哈希算法将消息路由到不同的Queue上。
x-delayed-type
当Exchange类型为x-delayed-message时,需要配置此参数,以指定Exchange的路由类型。
哈希取值
当Exchange类型为x-consistent-hash时,需要配置此参数,以指定Hash计算的输入值为哪种类型。取值如下:
RoutingKey
Header 值:使用Header方式作为Hash计算输入值时,您需要定义hash-header参数的取值。
hash-header
当Exchange类型为x-consistent-hash且哈希取值为Header 值时,需要配置此参数,作为指定Hash计算的输入值。
Internal
是否为Internal类型,默认值为否。取值:
是:内建类型,用于Exchange和Exchange之间的绑定。
否:非内建类型,用于Exchange和Queue之间的绑定。
创建Queue
Queue是指队列,云消息队列 RabbitMQ 版的消息都会被投入到一个或多个Queue中。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击Queue 列表。
在Queue 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,单击创建 Queue。
在创建 Queue面板,在Queue 名称文本框输入Queue的名称,选择是否为Auto Delete类型,单击高级选项,设置Queue的参数,然后单击确定。
参数
描述
说明
Queue 名称
Queue的名称
只能包含字母、数字、短划线(-)、下划线(_)、半角句号(.)、井号(#)、正斜线(/)、at符号(@)。
长度限制在1~255字符。
创建后无法修改,只能删除重建。
以amq.开头的为保留字段,因此不能使用。例如:amq.test。
Auto Delete
最后一个Consumer取消订阅后,Queue是否自动删除。
true:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,自动删除该Queue。
false:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,不自动删除该Queue。
高级选项
Queue的参数设置,可用于设置死信Exchange、死信Routing Key和消息存活时间。
DeadLetterExchange:指定死信消息发送的目标Exchange。
DeadLetterRoutingKey:指定死信消息的Routing Key,即死信Exchange会将消息发送至匹配该死信Routing Key所对应的Queue。
MessageTTL:消息存活时间,单位毫秒(ms)。在指定时间内未被成功消费的消息会变成死信消息,该消息将会被发送到死信Exchange。更多信息,请参见消息存活时间。
创建绑定关系
将Queue和指定的Exchange进行绑定,消息将按照对应的转发规则从Exchange转发到Queue中。
在Queue 列表页面,选择指定Queue,在其操作列单击详情。
在Queue 详情页面单击被绑定信息页签,单击添加被绑定。
在添加被绑定面板,选择源Exchange,在Routing Key文本框输入Routing Key,然后单击确定。
说明若被绑定的Exchange的类型为x-consistent-hash时,Routing Key表示绑定的Queue的权重,只能设置为整数,取值范围为[1~20]。
创建用户名密码
开源客户端访问云消息队列 RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID和AccessKey Secret,然后单击确定。
说明AccessKey ID和AccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
步骤三:调用SDK收发消息
安装Java依赖库
在IDEA中创建一个Java工程。
在pom.xml文件中添加以下依赖引入Java依赖库。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
生产消息
在已创建的Java工程中,创建消息发送程序,按照SDK参数填写说明配置相关参数并运行。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//设置实例的接入点。
String hostName = "xxx.xxx.aliyuncs.com";
//设置实例的静态用户名密码。
String userName = "${UserName}";
String passWord = "${PassWord}";
//设置实例的Vhost。
String virtualHost = "${VirtualHost}";
//在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//设置Exchange、Queue和绑定关系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String bindingKey = "${BindingKey}";
//设置Exchange类型。
String exchangeType = "${ExchangeType}";
//此处为了体验流畅,确保了Exchange和Queue的创建过程。
//在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, bindingKey);
//开始发送消息。
for (int i = 0; i < 10; i++ ) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, bindingKey, true, props,
("消息发送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
}
channel.close();
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
//基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多信息,请参见实例限流最佳实践。
订阅消息
在已创建的Java工程中,创建消息订阅程序,按照SDK参数填写说明配置相关参数并运行。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//设置实例的接入点。
String hostName = "xxx.xxx.aliyuncs.com";
//设置实例的静态用户名密码。
String userName = "${UserName}";
String passWord = "${PassWord}";
//设置实例的Vhost。
String virtualHost = "${VirtualHost}";
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
final Channel channel = connection.createChannel();
//声明Queue。
String queueName = "${QueueName}";
//创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
//开始消费消息。
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
};
}
SDK参数填写说明
参数 | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见步骤二:创建资源。 |
Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
virtualHost | amqp_vhost | 云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
exchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
queueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 |
bindingKey | BindingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue绑定的Binding Key。 需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。 具体操作,请参见步骤二:创建资源。 |
exchangeType | topic | Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange。
重要 请确保填写的Exchange类型和您创建Exchange时选择的类型一致。 |
步骤四:配置监控告警
当您需要监控云消息队列 RabbitMQ 版的使用情况时,可以使用云监控创建报警规则。如果资源的监控指标达到报警条件,云监控自动发送报警通知,帮助您及时得知异常监控数据,并快速处理。具体操作请参见监控指标。如果您需要查看基于阿里云ARMS Prometheus监控服务和Grafana的指标信息,请参见Dashboard。