步骤三:调用SDK收发消息
本文以Java SDK为例,说明如何将开源SDK客户端接入云消息队列 RabbitMQ 版服务端,并完成消息收发。
前提条件
获取实例接入点
在收发消息时,您需要为发布端和订阅端配置该接入点,客户端通过接入点接入云消息队列 RabbitMQ 版实例。
- 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。 
- 在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。 
- 在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的  图标,复制该接入点。 图标,复制该接入点。- 类型 - 说明 - 示例值 - 公网接入点 - 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 - XXX.net.mq.amqp.aliyuncs.com - VPC接入点 - VPC环境可读写。按量付费实例和预付费实例默认都支持。 - XXX.vpc.mq.amqp.aliyuncs.com 
安装Java依赖库
- 在IDEA中创建一个Java工程。 
- 在pom.xml文件中添加以下依赖引入Java依赖库。 - <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持开源所有版本 --> </dependency>
创建用户名密码
开源RabbitMQ客户端接入云上服务时,创建用户名密码,将用户名和密码设置到开源客户端SDK的userName和passWord参数中。云消息队列 RabbitMQ 版会通过用户名和密码进行权限认证。
根据实例的身份权限管理模式,选择以下一种方式创建用户名和密码:
- 开源身份验证和权限管理方式 - 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。 
- 在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。 
- 在左侧导航栏,单击用户和权限管理。 
- 在用户和权限管理页面,单击创建用户名密码。 
- 在创建用户名密码面板,输入用户名、密码和确认密码等信息,单击确定。 
 说明- 创建用户名密码完成后,还需要对该用户进行授权,具体操作请参见权限管理。 
- 阿里云访问控制(RAM) - 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。 
- 在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。 
- 在左侧导航栏,单击用户和权限管理。 
- 在用户和权限管理页面,单击创建用户名密码。 
- 在创建用户名密码面板,输入AccessKey ID和AccessKey Secret,然后单击确定。 说明- AccessKey ID和AccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey。 - 用户和权限管理页面,显示创建的静态用户名与密码,密码处于隐藏状态。  
- 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。 
 
创建客户端连接
创建连接管理工厂ConnectionFactory.java,用于启动开源客户端和云消息队列 RabbitMQ 版服务端的连接。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
    private final String hostName;
    private final int port;
    private final String userName;
    private final String password;
    private final String virtualHost;
    private final boolean enableSSL;
    public ConnectionFactory(String hostName, int port, String userName,
                            String password, String virtualHost, boolean enableSSL) {
        this.hostName = hostName;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.enableSSL = enableSSL;
    }
    public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //create a new con
        Connection con = createCon();
        //create a new channel
        return con.createChannel();
    }
    private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(password);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口。
        factory.setPort(port);
        if (enableSSL) {
            setSSL(factory);
        }
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        return factory.newConnection();
    }
    private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init((KeyStore) null);
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
        factory.useSslProtocol(sslContext);
    }
    public void closeCon(Channel channel) {
        if (channel != null && channel.getConnection() != null) {
            try {
                channel.getConnection().close();
            } catch (Throwable t) {
            }
        }
    }
}生产消息
在已创建的Java工程中,创建消息发送程序Producer.java,按照SDK参数填写说明配置相关参数并运行。有关发送消息的注意事项,请参见生产消息时需要注意什么?。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
    //设置为云消息队列 RabbitMQ 版实例的接入点。
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名。
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名密码。
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    //设置为云消息队列 RabbitMQ 版实例的Vhost名称。
    public static final String virtualHost = "vhost_test";
    //如果使用5671端口,需要enableSSL设置为true。
    public static final int port = 5672;
    public static final boolean enableSSL = false;
    private Channel channel;
    private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
    private final ConnectionFactory factory;
    private final String exchangeName;
    private final String queueName;
    private final String routingKey;
    public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.factory = factory;
        this.outstandingConfirms = new ConcurrentSkipListMap<>();
        this.channel = factory.createChannel();
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
    }
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //构建连接工厂。
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
        //初始化生产者。
        Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");
        //declare。
        producer.declare();
        producer.initChannel();
        //发送消息。
        producer.doSend("hello,amqp");
    }
    private void initChannel() throws IOException {
        channel.confirmSelect();
        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
                for (Long tag : confirmed.keySet()) {
                    String msgId = confirmed.get(tag);
                    System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                }
                confirmed.clear();
            } else {
                String msgId = outstandingConfirms.remove(deliveryTag);
                System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
            }
        };
        channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
            String msgId = outstandingConfirms.get(deliveryTag);
            System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
            // send msg failed, re-publish
        });
        channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    }
    private void declare() throws IOException {
        channel.exchangeDeclare(exchangeName, "direct", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
    }
    
    private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        try {
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
            outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
        } catch (AlreadyClosedException e) {
            //need reconnect if channel is closed.
            String message = e.getMessage();
            System.out.println(message);
            if (channelClosedByServer(message)) {
                factory.closeCon(channel);
                channel = factory.createChannel();
                this.initChannel();
                doSend(content);
            } else {
                throw e;
            }
        }
    }
    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")
            && errorMsg.contains("reply-code=541")
            && errorMsg.contains("reply-text=InternalError")) {
            return true;
        } else {
            return false;
        }
    }
}云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,更多限流信息,请参见实例限流最佳实践。
订阅消息
在已创建的Java工程中,创建消息订阅程序Consumer.java,按照SDK参数填写说明配置相关参数并运行。
示例代码如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
    //设置为云消息队列 RabbitMQ 版实例的接入点。
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名。
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名密码。
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    //设置为云消息队列 RabbitMQ 版实例的Vhost名称。
    public static final String virtualHost = "vhost_test";
    
    //如果使用5671端口,需要enableSSL设置为true。
    public static final int port = 5672;
    public static final boolean enableSSL = false;
    private final Channel channel;
    private final String queue;
    public Consumer(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
        Channel channel = factory.createChannel();
        channel.basicQos(50);
         
        //设置为云消息队列 RabbitMQ 版实例的Queue名称。需要和生产者中设置的Queue名称一致。
        Consumer consumer = new Consumer(channel, "queue-1");
        consumer.consume();
    }
    public void consume() throws IOException, InterruptedException {
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                //业务处理。
                System.out.println("receive: msgId=" + properties.getMessageId());
                //消费者需要在有效时间内提交ack,否则消息会重新推送,最多推送16次。
                //若推送16次还未成功,则消息被丢弃或者进入死信Exchange。
                //专业版实例的有效时间为1分钟,企业版和Serverless实例为5分钟,铂金版实例为30分钟。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                channel.getConnection().close();
            } catch (IOException e) {
                System.out.println("close connection error." + e);
            }
            latch.countDown();
        }));
        latch.await();
    }
}SDK参数填写说明
| 参数 | 示例值 | 描述 | 
| hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取实例接入点。 | 
| Port | 5672 | 默认端口。非加密端口为5672,加密端口为5671。 | 
| userName | MjoxODgwNzcwODY5MD**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建用户名密码。 | 
| passWord | NDAxREVDQzI2MjA0OT**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建用户名密码。 | 
| virtualHost | amqp_vhost | 云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 | 
| exchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 | 
| queueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见步骤二:创建资源。 | 
| routingKey | RoutingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue绑定的Routing Key。 需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。 具体操作,请参见步骤二:创建资源。 | 
| exchangeType | topic | Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange。 
 重要  请确保填写的Exchange类型和您创建Exchange时选择的类型一致。 | 
相关文档
- 云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容,支持多语言SDK。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK,更多参数说明,请参见开源RabbitMQ客户端文档。 
- 客户端运行时若返回异常报错,您可以参考错误码说明查看异常原因和解决方案。 
- 您可以在云消息队列 RabbitMQ 版控制台通过消息查询或轨迹查询验证消息的收发状态和消息轨迹。具体操作,请参见查询消息和消息轨迹。