本文说明如何调用SDK收发消息。

前提条件

创建资源

背景信息

客户端接入消息队列RabbitMQ版时,消息队列RabbitMQ版会通过用户名密码进行权限认证。消息队列RabbitMQ版支持以下方式为客户端生成用户名密码:
  • 动态用户名密码:通过阿里云提供的权限认证类生成动态用户名密码。
  • 静态用户名密码(推荐):通过消息队列RabbitMQ版控制台生成静态用户名密码。该方式和开源RabbitMQ保持一致。
注意 您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

本文以Java SDK为例进行说明。更多语言或框架的SDK,请参见SDK概述

收发消息流程

ft_quick_start

获取接入点

您需要在消息队列RabbitMQ版控制台获取实例的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入消息队列RabbitMQ版实例。

  1. 登录消息队列RabbitMQ版控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击实例详情
  4. 实例详情页面,选择实例,在基本信息区域,将鼠标指针移动到目标类型的接入点,然后单击该接入点以复制该接入点。
    类型 说明 示例值
    公网接入点 公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。 XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
    VPC接入点 VPC环境可读写。按量付费实例和预付费实例默认都支持。 XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

在pom.xml中添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.mq-amqp</groupId>
    <artifactId>mq-amqp-client</artifactId>
    <version>1.0.5</version>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>

生成用户名密码

创建用于生成动态用户名密码的AliyunCredentialsProvider.java。

import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;
import org.apache.commons.lang3.StringUtils;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
/**
 * 阿里云UserName、Password生成类(动态变化)。
 */
public class AliyunCredentialsProvider implements CredentialsProvider {
    /**
     * Access Key ID.
     */
    private final String accessKeyId;
    /**
     * Access Key Secret.
     */
    private final String accessKeySecret;
    /**
     * security temp token. (optional)
     */
    private final String securityToken;
    /**
     * 实例ID(从消息队列RabbitMQ版控制台获取)。
     */
    private final String instanceId;
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String instanceId) {
        this(accessKeyId, accessKeySecret, null, instanceId);
    }
    public AliyunCredentialsProvider(final String accessKeyId, final String accessKeySecret,
                                     final String securityToken, final String instanceId) {
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.securityToken = securityToken;
        this.instanceId = instanceId;
    }
    @Override
    public String getUsername() {
        if(StringUtils.isNotEmpty(securityToken)) {
            return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
        } else {
            return UserUtils.getUserName(accessKeyId, instanceId);
        }
    }

    @Override
    public String getPassword() {
        try {
            return UserUtils.getPassord(accessKeySecret);
        } catch (InvalidKeyException e) {
            //todo
        } catch (NoSuchAlgorithmException e) {
            //todo
        }
        return null;
    }
}
更多信息,请参见创建静态用户名密码

创建绑定

创建并编译运行BindingKeyTest.java。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

 public class BindingKeyTest {
     public static void main(String[] args) throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
         factory.setHost("xxx.xxx.aliyuncs.com");
         // ${instanceId}为实例ID,在RabbitMQ版控制台实例详情页面查看。
         // ${AccessKey}阿里云身份验证,在阿里云控制台创建。
         // ${SecretKey}阿里云身份验证,在阿里云控制台创建。
         factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
 	// 一定要这个才能自动恢复。
         factory.setAutomaticRecoveryEnabled(true);
         factory.setNetworkRecoveryInterval(5000);
         // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
         factory.setVirtualHost("${VhostName}");
         // 默认端口,非加密端口5672,加密端口5671。
         factory.setPort(5672);
         factory.setConnectionTimeout(300 * 1000);
         factory.setHandshakeTimeout(300 * 1000);
         factory.setShutdownTimeout(0);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
         // 创建 ${ExchangeName}。
         // Exchange可以在RabbitMQ版控制台创建,也可以用API创建。如果控制台已存在,则Exchange的类型必须和控制台一致。
         channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true);
         // 创建 ${QueueName}。Queue可以在RabbitMQ版控制台创建,也可以用API创建。
         channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
         // Queue与Exchange进行绑定,注册BindingKeyTest。
         channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKeyTest}");
         connection.close();
     }
 }
 
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

 public class BindingKeyTest {
     public static void main(String[] args) throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
         factory.setHost("xxx.xxx.aliyuncs.com");
         // 用户名,在RabbitMQ版控制台用户名密码管理页面查看。
         factory.setUsername("${Username}");
         // 密码,在RabbitMQ版控制台用户名密码管理页面查看。
         factory.setPassword("${Password}");
 	// 一定要这个才能自动恢复。
         factory.setAutomaticRecoveryEnabled(true);
         factory.setNetworkRecoveryInterval(5000);
         // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
         factory.setVirtualHost("${VhostName}");
         // 默认端口,非加密端口5672,加密端口5671。
         factory.setPort(5672);
         factory.setConnectionTimeout(300 * 1000);
         factory.setHandshakeTimeout(300 * 1000);
         factory.setShutdownTimeout(0);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
         // 创建 ${ExchangeName}。
         // Exchange可以在RabbitMQ版控制台创建,也可以用API创建。如果控制台已存在,则Exchange的类型必须和控制台一致。
         channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true);
         // 创建 ${QueueName}。Queue可以在RabbitMQ版控制台创建,也可以用API创建。
         channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
         // Queue与Exchange进行绑定,注册BindingKeyTest。
         channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKeyTest}");
         connection.close();
     }
 }
 

生产消息

创建并编译运行ProducerTest.java。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}为实例ID,在RabbitMQ版控制台实例详情页面查看。
        // ${AccessKey}阿里云身份验证,在阿里云控制台创建。
        // ${SecretKey}阿里云身份验证,在阿里云控制台创建。
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
        //一定要这个才能自动恢复。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setUsername("${UserName}");
        // 密码,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setPassword("${PassWord}");
        //一定要这个才能自动恢复。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

消费消息

创建并编译运行ConsumerTest.java。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}为实例ID,在RabbitMQ版控制台概览页面查看。
        // ${AccessKey}阿里云身份验证,在阿里云控制台创建。
        // ${SecretKey}阿里云身份验证,在阿里云控制台创建。
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
        //一定要这个才能自动恢复。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setUsername("${Username}");
        // 密码,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setPassword("${Password}");
        //一定要这个才能自动恢复。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}