使用Java SDK收发消息

更新时间:

本文为您介绍快速使用云消息队列 RabbitMQ 版进行消息收发的操作流程。

操作流程

image

步骤一:(可选)RAM用户授权

RAM用户默认没有云消息队列 RabbitMQ 版资源的操作权限。如果您的账号为RAM用户,必须先为RAM用户进行授权,若您的账号为阿里云账号,则默认拥有云消息队列 RabbitMQ 版服务的所有权限,无需进行授权操作。

如果您需要为RAM用户授权,具体操作,请参见为RAM用户授权

步骤二:创建资源

创建实例

实例是一个独立的云消息队列 RabbitMQ 版资源实体,包含Vhost、Exchange、Queue等基本的资源要素。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 在顶部菜单栏选择地域,然后在实例列表页面,单击创建实例

  3. 在购买页完成基本配置并勾选服务协议,然后单击立即购买

  4. 根据提示完成支付。

    实例列表页面的顶部菜单栏,选择地域,您可以看到创建的实例。

    说明
    • 专业版实例和企业版实例购买后,立即进入服务中状态。

    • 铂金版实例购买后,首先进入部署中状态,待集群分配后,再进入服务中状态。

获取实例接入点

在收发消息时,您需要为发布端和订阅端配置该接入点,客户端通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.net.mq.amqp.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.vpc.mq.amqp.aliyuncs.com

创建Vhost

Vhost是指虚拟主机,用作逻辑隔离,分别管理各自的Exchange、Queue和Binding,使得应用安全地运行在不同的Vhost上,相互之间不会干扰。一个实例下可以有多个Vhost,一个Vhost里面可以有若干个Exchange和Queue。Producer和Consumer连接云消息队列 RabbitMQ 版需要指定一个Vhost。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Vhost 列表

  4. Vhost 列表页面,单击创建 Vhost

  5. 创建 Vhost面板的Vhost名称文本框,输入Vhost名称,然后单击确定

创建Exchange

Producer将消息发送到Exchange,Exchange根据Routing Key将消息路由到一个或多个Queue中(或者丢弃)。不同类型的Exchange的路由规则不同。更多信息,请参见Exchange

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Exchange 列表

  4. Exchange 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,然后单击创建 Exchange

  5. 创建 Exchange面板,输入Exchange名称,选择Exchange类型,设置是否为Internal类型,然后单击确定

    参数

    描述

    Exchange 名称

    Exchange名称。以amq.开头的为保留字段,因此不能使用。例如:amq.test。

    类型

    Exchange类型。取值:

    • direct:该类型的路由规则会将消息路由到Routing Key完全匹配的Queue中。

    • topic:该类型与direct类型相似。Topic Exchange路由规则没有Direct Exchange那么严格,支持模糊匹配和多条件匹配,即该类型Exchange使用Routing Key模式匹配和字符串比较的方式将消息路由至绑定的Queue中。

    • fanout:该类型的路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能。

    • headers:该类型与direct类型相似。Headers Exchange使用Headers属性代替Routing Key进行路由匹配,在绑定Headers Exchange和Queue时,设置绑定属性的键值对;在向Headers Exchange发送消息时,设置消息的Headers属性键值对,使用消息Headers属性键值对和绑定属性键值对比较的方式将消息路由至绑定的Queue。

    • x-delayed-message:通过声明该类Exchange,您可以自定义消息的Header属性x-delay来指定消息延时投递的时间段,单位为毫秒。消息将在x-delay中定义的时间段后,根据路由规则被投递到对应的Queue。路由规则取决于x-delayed-type中指定的Exchange路由类型。

    • x-consistent-hash:x-consistent-hash Exchange支持将Routing Key或Header值进行Hash计算,使用一致性哈希算法将消息路由到不同的Queue上。

    x-delayed-type

    当Exchange类型为x-delayed-message时,需要配置此参数,以指定Exchange的路由类型。

    哈希取值

    当Exchange类型为x-consistent-hash时,需要配置此参数,以指定Hash计算的输入值为哪种类型。取值如下:

    • RoutingKey

    • Header 值:使用Header方式作为Hash计算输入值时,您需要定义hash-header参数的取值。

    hash-header

    当Exchange类型为x-consistent-hash哈希取值Header 值时,需要配置此参数,作为指定Hash计算的输入值。

    Internal

    是否为Internal类型,默认值为。取值:

    • :内建类型,用于Exchange和Exchange之间的绑定。

    • :非内建类型,用于Exchange和Queue之间的绑定。

创建Queue

Queue是指队列,云消息队列 RabbitMQ 版的消息都会被投入到一个或多个Queue中。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Queue 列表

  4. Queue 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,单击创建 Queue

  5. 创建 Queue面板,在Queue 名称文本框输入Queue的名称,选择是否为Auto Delete类型,单击高级选项,设置Queue的参数,然后单击确定

    参数

    描述

    说明

    Queue 名称

    Queue的名称

    • 只能包含字母、数字、短划线(-)、下划线(_)、半角句号(.)、井号(#)、正斜线(/)、at符号(@)。

    • 长度限制在1~255字符。

    • 创建后无法修改,只能删除重建。

    • 以amq.开头的为保留字段,因此不能使用。例如:amq.test。

    Auto Delete

    最后一个Consumer取消订阅后,Queue是否自动删除。

    • true:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,自动删除该Queue。

    • false:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,不自动删除该Queue。

    高级选项

    Queue的参数设置,可用于设置死信Exchange、死信Routing Key和消息存活时间。

    • DeadLetterExchange:指定死信消息发送的目标Exchange。

    • DeadLetterRoutingKey:指定死信消息的Routing Key,即死信Exchange会将消息发送至匹配该死信Routing Key所对应的Queue。

    • MessageTTL:消息存活时间,单位毫秒(ms)。在指定时间内未被成功消费的消息会变成死信消息,该消息将会被发送到死信Exchange。更多信息,请参见消息存活时间

创建绑定关系

将Queue和指定的Exchange进行绑定,消息将按照对应的转发规则从Exchange转发到Queue中。

  1. Queue 列表页面,选择指定Queue,在其操作列单击详情

  2. Queue 详情页面单击被绑定信息页签,单击添加被绑定

  3. 添加被绑定面板,选择源Exchange,在Routing Key文本框输入Routing Key,然后单击确定

    说明

    若被绑定的Exchange的类型为x-consistent-hash时,Routing Key表示绑定的Queue的权重,只能设置为整数,取值范围为[1~20]。

创建用户名密码

开源客户端访问云消息队列 RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

步骤三:调用SDK收发消息

说明

您需要提前安装IDE安装JDK安装Maven。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持开源所有版本 -->
    </dependency>

生产消息

在已创建的Java工程中,创建消息发送程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String bindingKey = "${BindingKey}";
        //设置Exchange类型。
        String exchangeType = "${ExchangeType}";

        //此处为了体验流畅,确保了Exchange和Queue的创建过程。
        //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        //开始发送消息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        channel.close();
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}
说明

云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多信息,请参见实例限流最佳实践

订阅消息

在已创建的Java工程中,创建消息订阅程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "${QueueName}";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK参数填写说明

参数

示例值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。获取方式,请参见步骤二:创建资源

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见步骤二:创建资源

passWord

NDAxREVDQzI2MjA0OT****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见步骤二:创建资源

virtualHost

amqp_vhost

云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见步骤二:创建资源

exchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见步骤二:创建资源

queueName

QueueTest

云消息队列 RabbitMQ 版的Queue。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见步骤二:创建资源

bindingKey

BindingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue绑定的Binding Key。

需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。

具体操作,请参见步骤二:创建资源

exchangeType

topic

Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-jms-topic

  • x-delayed-message

  • x-consistent-hash

重要

请确保填写的Exchange类型和您创建Exchange时选择的类型一致。

步骤四:配置监控告警

当您需要监控云消息队列 RabbitMQ 版的使用情况时,可以使用云监控创建报警规则。如果资源的监控指标达到报警条件,云监控自动发送报警通知,帮助您及时得知异常监控数据,并快速处理。具体操作请参见监控指标。如果您需要查看基于阿里云ARMS Prometheus监控服务和Grafana的指标信息,请参见Dashboard