文档

使用Java SDK收发消息

更新时间:
一键部署

本文为您介绍快速使用云消息队列 RabbitMQ 版进行消息收发的操作流程。

操作流程

image

步骤一:(可选)RAM用户授权

RAM用户默认没有云消息队列 RabbitMQ 版资源的操作权限。如果您的账号为RAM用户,必须先为RAM用户进行授权,若您的账号为阿里云账号,则默认拥有云消息队列 RabbitMQ 版服务的所有权限,无需进行授权操作。

如果您需要为RAM用户授权,具体操作,请参见为RAM用户授权

步骤二:创建资源

创建实例

实例是一个独立的云消息队列 RabbitMQ 版资源实体,包含Vhost、Exchange、Queue等基本的资源要素。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 在顶部菜单栏选择地域,然后在实例列表页面,单击创建实例

  3. 在购买页完成基本配置并勾选服务协议,然后单击立即购买

  4. 根据提示完成支付。

    实例列表页面的顶部菜单栏,选择地域,您可以看到创建的实例。

    说明
    • 专业版实例和企业版实例购买后,立即进入服务中状态。

    • 铂金版实例购买后,首先进入部署中状态,待集群分配后,再进入服务中状态。

获取实例接入点

在收发消息时,您需要为发布端和订阅端配置该接入点,客户端通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.net.mq.amqp.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.vpc.mq.amqp.aliyuncs.com

创建Vhost

Vhost是指虚拟主机,用作逻辑隔离,分别管理各自的Exchange、Queue和Binding,使得应用安全地运行在不同的Vhost上,相互之间不会干扰。一个实例下可以有多个Vhost,一个Vhost里面可以有若干个Exchange和Queue。Producer和Consumer连接云消息队列 RabbitMQ 版需要指定一个Vhost。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Vhost 列表

  4. Vhost 列表页面,单击创建 Vhost

  5. 创建 Vhost面板的Vhost名称文本框,输入Vhost名称,然后单击确定

创建Exchange

Producer将消息发送到Exchange,Exchange根据Routing Key将消息路由到一个或多个Queue中(或者丢弃)。不同类型的Exchange的路由规则不同。更多信息,请参见Exchange

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Exchange 列表

  4. Exchange 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,然后单击创建 Exchange

  5. 创建 Exchange面板,输入Exchange名称,选择Exchange类型,设置是否为Internal类型,然后单击确定

    参数

    描述

    Exchange 名称

    Exchange名称。以amq.开头的为保留字段,因此不能使用。例如:amq.test。

    类型

    Exchange类型。取值:

    • direct:该类型路由规则会将消息路由到Binding Key与Routing Key完全匹配的Queue中。

    • topic:该类型与direct类型相似。Topic Exchange路由规则没有Direct Exchange那么严格, 支持模糊匹配和多条件匹配,即该类型Exchange使用Routing Key模式匹配和字符串比较的方式将消息路由至绑定的Queue中。

    • fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能。

    • headers:该类型与direct类型相似。Headers Exchange使用Headers属性代替Routing Key进行路由匹配,在绑定Headers Exchange和Queue时,设置绑定属性的键值对;在向Headers Exchange发送消息时,设置消息的Headers属性键值对,使用消息Headers属性键值对和绑定属性键值对比较的方式将消息路由至绑定的Queue。

    • x-jms-topic:适用于通过云消息队列 RabbitMQ 版提供的JMS接口接入云消息队列 RabbitMQ 版的JMS应用,该类型路由规则会将消息路由到Binding Key与Routing Key通配符匹配的Queue中。更多信息,请参见JMS概述

    • x-delayed-message:通过声明该类Exchange,您可以自定义消息的Header属性x-delay来指定消息延时投递的时间段,单位为毫秒。消息将在x-delay中定义的时间段后,根据路由规则被投递到对应的Queue。路由规则取决于x-delayed-type中指定的Exchange路由类型。

    • x-consistent-hash:x-consistent-hash Exchange支持将Routing Key或Header值进行Hash计算,使用一致性哈希算法将消息路由到不同的Queue上。

    x-delayed-type

    当Exchange类型为x-delayed-message时,需要配置此参数,以指定Exchange的路由类型。

    哈希取值

    当Exchange类型为x-consistent-hash时,需要配置此参数,以指定Hash计算的输入值为哪种类型。取值如下:

    • RoutingKey

    • Header 值:使用Header方式作为Hash计算输入值时,您需要定义hash-header参数的取值。

    hash-header

    当Exchange类型为x-consistent-hash哈希取值Header 值时,需要配置此参数,作为指定Hash计算的输入值。

    Internal

    是否为Internal类型,默认值为。取值:

    • :内建类型,用于Exchange和Exchange之间的绑定。

    • :非内建类型,用于Exchange和Queue之间的绑定。

创建Queue

Queue是指队列,云消息队列 RabbitMQ 版的消息都会被投入到一个或多个Queue中。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击Queue 列表

  4. Queue 列表页面,在当前 Vhost右侧的切换下拉列表中,选择Vhost,单击创建 Queue

  5. 创建 Queue面板,在Queue 名称文本框输入Queue的名称,选择是否为Auto Delete类型,单击高级选项,设置Queue的参数,然后单击确定

    参数描述说明
    Queue 名称Queue的名称
    • 只能包含字母、数字、短划线(-)、下划线(_)、半角句号(.)、井号(#)、正斜线(/)、at符号(@)。
    • 长度限制在1~255字符。
    • 创建后无法修改,只能删除重建。
    • 以amq.开头的为保留字段,因此不能使用。例如:amq.test。
    Auto Delete最后一个Consumer取消订阅后,Queue是否自动删除。
    • true:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,自动删除该Queue。
    • false:在订阅该Queue消息的最后一个Consumer取消订阅该Queue的消息后,不自动删除该Queue。
    高级选项Queue的参数设置,可用于设置死信Exchange、死信Routing Key和消息存活时间。
    • DeadLetterExchange:指定死信消息发送的目标Exchange。
    • DeadLetterRoutingKey:指定死信消息的Routing Key,即死信Exchange会将消息发送至匹配该死信Routing Key的Binding Key所对应的Queue。
    • MessageTTL:消息存活时间,单位毫秒(ms)。超过指定时间消息还未被成功消费则变为死信消息,该消息将会被发送到死信Exchange。更多信息,请参见消息存活时间

创建绑定关系

将Queue和指定的Exchange进行绑定,消息将按照对应的转发规则从Exchange转发到Queue中。

  1. Queue 列表页面,选择指定Queue,在其操作列单击详情

  2. Queue 详情页面单击被绑定信息页签,单击添加被绑定

  3. 添加被绑定面板,选择源Exchange,在Binding Key文本框输入Binding Key,然后单击确定

    说明

    若被绑定的Exchange的类型为x-consistent-hash时,Routing Key表示绑定的Queue的权重,只能设置为整数,取值范围为[1~20]。

创建用户名密码

开源客户端访问云消息队列 RabbitMQ 版服务端时,需要传入用户名和密码进行权限认证,认证通过才允许访问服务端。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

步骤三:调用SDK收发消息

说明

您需要提前安装IDE安装JDK安装Maven。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持开源所有版本 -->
    </dependency>

生产消息

在已创建的Java工程中,创建消息发送程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String bindingKey = "${BindingKey}";
        //设置Exchange类型。
        String exchangeType = "${ExchangeType}";

        //此处为了体验流畅,确保了Exchange和Queue的创建过程。
        //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        //开始发送消息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        channel.close();
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

订阅消息

在已创建的Java工程中,创建消息订阅程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "${QueueName}";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK参数填写说明

参数

示例值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。获取方式,请参见步骤二:创建资源

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

passWord

NDAxREVDQzI2MjA0OT****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

virtualHost

amqp_vhost

云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Vhost

exchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Exchange

queueName

QueueTest

云消息队列 RabbitMQ 版的Queue。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Queue

bindingKey

BindingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue绑定的Binding Key。

需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。

具体操作,请参见创建绑定关系

exchangeType

topic

Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-jms-topic

  • x-delayed-message

  • x-consistent-hash

重要

请确保填写的Exchange类型和您创建Exchange时选择的类型一致。

步骤四:配置监控告警

当您需要监控云消息队列 RabbitMQ 版的使用情况时,可以使用云监控创建报警规则。如果资源的监控指标达到报警条件,云监控自动发送报警通知,帮助您及时得知异常监控数据,并快速处理。具体操作请参见监控指标。如果您需要查看基于阿里云ARMS Prometheus监控服务和Grafana的指标信息,请参见Dashboard