调用SDK收发消息

更新时间: 2023-08-08 14:53:00

本文以Java SDK为例说明如何调用SDK收发消息。

前提条件

创建资源

背景信息

客户端接入云消息队列 RabbitMQ 版时,云消息队列 RabbitMQ 版会通过用户名密码进行权限认证。云消息队列 RabbitMQ 版支持通过控制台为客户端生成静态用户名密码,该方式和开源RabbitMQ保持一致。

重要

您的客户端在调用SDK收发消息时,请尽可能使用长期存活的Connection,以免每次收发消息时都需要创建新的Connection,消耗大量的网络资源和服务端资源,甚至引起服务端SYN Flood防护。更多信息,请参见Connection

收发消息流程(以Java语言为例)

开源SDK收发流程
说明

云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多语言SDK,请参见开源RabbitMQ AMQP协议支持的多语言或框架SDK

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

pom.xml添加以下依赖。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>

生成用户名密码

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

生产消息

创建并编译运行ProducerTest.java

重要

编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。

表 1. 参数列表

参数

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取接入点

Port

5672

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面获取。

passWord

NDAxREVDQzI2MjA0OT****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码获取。

virtualHost

Test

云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情

ExchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。

BindingKey

BindingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。

QueueName

QueueTest

云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange綁定的Queue。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
        factory.setUsername("${UserName}");
        // 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
        factory.setPassword("${PassWord}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();        
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

订阅消息

创建并编译运行ConsumerTest.java

重要

编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息配置参数列表中所列举的参数。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
        factory.setUsername("${Username}");
        // 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
        factory.setPassword("${Password}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
说明

云消息队列 RabbitMQ 版与开源RabbitMQ完全兼容。更多参数说明,请参见开源RabbitMQ客户端文档

阿里云首页 云消息队列 RabbitMQ 版 相关技术圈