开源SDK接入指南(数据加密传输场景)

更新时间: 2023-09-06 15:08:32

云消息队列 RabbitMQ 版支持TLS加密传输。本文以调用Java SDK为例,介绍如何在使用开源SDK实现消息收发的操作过程中,使用TLS实现数据传输加密,其他语言或框架的SDK数据传输加密相似。

前提条件

背景信息

开源RabbitMQ客户端支持通过用户名/密码和AK/SK两种方式接入云上服务,当您需要对传输的数据进行加密时,需要修改连接端口为5671加密端口,并启用TLS加密传输(默认使用TLSv1.2版本)。

加密收发消息流程(以Java语言为例)

开源SDK收发流程

获取接入点

您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的复制图标,复制该接入点。

    类型

    说明

    示例值

    公网接入点

    公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。

    XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

    VPC接入点

    VPC环境可读写。按量付费实例和预付费实例默认都支持。

    XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

安装Java依赖库

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>

生成用户名密码

  1. 登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表

  2. 实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。

  3. 在左侧导航栏,单击静态用户名密码

  4. 静态用户名密码页面,单击创建用户名密码

  5. 创建用户名密码面板,输入AccessKey IDAccessKey Secret,然后单击确定

    说明

    AccessKey IDAccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey

    静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。用户名密码

  6. 在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。

生产消息

创建并编译运行ProducerTest.java

重要

编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。

表1 配置参数列表

参数

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息队列 RabbitMQ 版实例接入点。

Port

5671

加密端口。

userName

MjoxODgwNzcwODY5MD****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey ID、AccessKey Secret和云消息队列 RabbitMQ 版实例ID通过Base64编码后生成的静态用户名。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码页面获取。

passWord

NDAxREVDQzI2MjA0OT****

云消息队列 RabbitMQ 版控制台将阿里云账号或RAM用户的AccessKey Secret和timestamp参数(系统当前时间)通过HMAC-SHA1生成一个签名后,再将这个签名和timestamp参数(系统当前时间)通过Base64编码后生成的静态密码。您可以在云消息队列 RabbitMQ 版控制台静态用户名密码获取。

virtualHost

Test

云消息队列 RabbitMQ 版实例的Vhost。您可以在云消息队列 RabbitMQ 版控制台Vhost 详情页面查看。如何查看Vhost,请参见查看Vhost连接详情

ExchangeName

ExchangeTest

云消息队列 RabbitMQ 版的Exchange。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,结合实例ID与Vhost模糊搜索已创建的Exchange。

BindingKey

BindingKeyTest

云消息队列 RabbitMQ 版Exchange与Queue的Binding Key。您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面查看Exchange的绑定关系,获取Binding Key。

QueueName

QueueTest

云消息队列 RabbitMQ 版的Queue。仅在订阅消息时候需要配置,您可以在云消息队列 RabbitMQ 版控制台Exchange 列表页面,查看Exchange的绑定关系,获取Exchange绑定的Queue。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在云消息队列 RabbitMQ 版控制台用户名密码管理页面查看。
        factory.setUsername("${UserName}");
        // 密码,在云消息队列 RabbitMQ 版控制台用户名密码管理页面查看。
        factory.setPassword("${PassWord}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 加密端口5671。
        factory.setPort(5671);
        // 启用TLS加密传输。
      	factory.useSslProtocol();
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();        
        channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

订阅消息

创建并编译运行ConsumerTest.java

重要

编译运行ConsumerTest.java订阅消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在云消息队列 RabbitMQ 版控制台用户名密码管理页面查看。
        factory.setUsername("${Username}");
        // 密码,在云消息队列 RabbitMQ 版控制台用户名密码管理页面查看。
        factory.setPassword("${Password}");
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 加密端口5671。
        factory.setPort(5671);
        // 启用TLS加密传输。
      	factory.useSslProtocol();
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 创建${ExchangeName},该Exchange必须在云消息队列 RabbitMQ 版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
        AMQP.Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
        // 创建${QueueName} ,该Queue必须在云消息队列 RabbitMQ 版控制台上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
        // Queue与Exchange进行绑定,并确认绑定的BindingKeyTest。
        AMQP.Queue.BindOk bindOk = channel.queueBind("${QueueName}", "${ExchangeName}", "BindingKeyTest");
        // 开始消费消息。
        channel.basicConsume("${QueueName}", false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();
    }
}

阿里云首页 云消息队列 MQ 相关技术圈