文档

步骤三:调用SDK收发消息

更新时间:
一键部署

本文以Java SDK为例,说明如何将开源SDK客户端接入云消息队列 RabbitMQ 版服务端,并完成消息收发。

前提条件

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持开源所有版本 -->
    </dependency>

生产消息

在已创建的Java工程中,创建消息发送程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        //设置Exchange类型。
        String exchangeType = "${ExchangeType}";

        //此处为了体验流畅,确保了Exchange和Queue的创建过程。
        //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        //开始发送消息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

订阅消息

在已创建的Java工程中,创建消息订阅程序,按照SDK参数填写说明配置相关参数并运行。

示例代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "${QueueName}";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK参数填写说明

参数

示例值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。获取方式,请参见步骤二:创建资源

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

passWord

NDAxREVDQzI2MjA0OT****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

virtualHost

amqp_vhost

云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Vhost

exchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Exchange

queueName

QueueTest

云消息队列 RabbitMQ 版的Queue。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Queue

routingKey

RoutingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue绑定的Routing Key。

需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。

具体操作,请参见创建绑定关系

exchangeType

topic

Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

重要

请确保填写的Exchange类型和您创建Exchange时选择的类型一致。

相关文档