开源SDK接入指南(数据加密传输场景)

更新时间:

云消息队列 RabbitMQ 版支持TLS加密传输。本文以调用Java SDK为例,介绍如何在使用开源SDK实现消息收发的操作过程中,使用TLS实现数据传输加密,其他语言或框架的SDK数据传输加密相似。

前提条件

背景信息

开源RabbitMQ客户端支持通过用户名/密码和AK/SK两种方式接入云上服务,当您需要对传输的数据进行加密时,需要修改连接端口为5671加密端口,并启用TLS加密传输(默认使用TLSv1.2版本)。

加密收发消息流程(以Java语言为例)

开源SDK收发流程

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>

生成用户名密码

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

生产消息

创建并编译运行ProducerTest.java

重要

编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。

表1 配置参数列表

参数

示例值

描述

hostName

XXX.net.mq.amqp.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。获取方式,请参见获取接入点

Port

5671

默认端口。非加密端口为5672,加密端口为5671。

userName

MjoxODgwNzcwODY5MD****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

passWord

NDAxREVDQzI2MjA0OT****

客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建用户名密码

virtualHost

amqp_vhost

云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Vhost

exchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Exchange

queueName

QueueTest

云消息队列 RabbitMQ 版的Queue。

需要提前在云消息队列 RabbitMQ 版控制台创建。

具体操作,请参见创建Queue

bindingKey

BindingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue绑定的Binding Key。

需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。

具体操作,请参见创建绑定关系

exchangeType

topic

Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange

  • direct

  • topic

  • fanout

  • headers

  • x-jms-topic

  • x-delayed-message

  • x-consistent-hash

重要

请确保填写的Exchange类型和您创建Exchange时选择的类型一致。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String bindingKey = "${BindingKey}";
        //设置Exchange类型。
        String exchangeType = "${ExchangeType}";

        //此处为了体验流畅,确保了Exchange和Queue的创建过程。
        //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        //开始发送消息。
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5671);
        factory.useSslProtocol();
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

订阅消息

创建并编译运行ConsumerTest.java

重要

编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        //声明Queue。
        String queueName = "${QueueName}";
        //创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        //开始消费消息。
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5671);
        factory.useSslProtocol();
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}