开源SDK接入指南(数据加密传输场景)
云消息队列 RabbitMQ 版支持TLS加密传输。本文以调用Java SDK为例,介绍如何在使用开源SDK实现消息收发的操作过程中,使用TLS实现数据传输加密,其他语言或框架的SDK数据传输加密相似。
前提条件
背景信息
开源RabbitMQ客户端支持通过用户名/密码和AK/SK两种方式接入云上服务,当您需要对传输的数据进行加密时,需要修改连接端口为5671加密端口,并启用TLS加密传输(默认使用TLSv1.2版本)。
加密收发消息流程(以Java语言为例)
获取接入点
您需要在云消息队列 RabbitMQ 版控制台获取实例的接入点。在收发消息时,您需要为发布端和订阅端配置该接入点,通过接入点接入云消息队列 RabbitMQ 版实例。
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在实例详情页面的接入点信息页签,将鼠标指针移动到目标类型的接入点,单击该接入点右侧的图标,复制该接入点。
类型
说明
示例值
公网接入点
公网环境可读写。按量付费实例默认支持,预付费实例需在购买时选择才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入点
VPC环境可读写。按量付费实例和预付费实例默认都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安装Java依赖库
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持开源所有版本 -->
</dependency>
生成用户名密码
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
在实例列表页面的顶部菜单栏选择地域,然后在实例列表中,单击目标实例名称。
在左侧导航栏,单击静态用户名密码。
在静态用户名密码页面,单击创建用户名密码。
在创建用户名密码面板,输入AccessKey ID和AccessKey Secret,然后单击确定。
说明AccessKey ID和AccessKey Secret需要在阿里云RAM控制台获取,具体获取方式,请参见创建AccessKey。
静态用户名密码页面,显示创建的静态用户名与密码,密码处于隐藏状态。
在创建的静态用户名密码的密码列,单击显示密码,可查看用户名的密码。
生产消息
创建并编译运行ProducerTest.java
编译运行ProducerTest.java生产消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。
表1 配置参数列表
参数 | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息队列 RabbitMQ 版实例接入点。获取方式,请参见步骤二:创建资源。 |
Port | 5671 | 默认端口。非加密端口为5672,加密端口为5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户名。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建用户名密码。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客户端接入云消息队列 RabbitMQ 版服务端用于权限认证的静态用户密码。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建用户名密码。 |
virtualHost | amqp_vhost | 云消息队列 RabbitMQ 版实例的Vhost。需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建Vhost。 |
exchangeName | ExchangeTest | 云消息队列 RabbitMQ 版的Exchange。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建Exchange。 |
queueName | QueueTest | 云消息队列 RabbitMQ 版的Queue。 需要提前在云消息队列 RabbitMQ 版控制台创建。 具体操作,请参见创建Queue。 |
bindingKey | BindingKeyTest | 云消息队列 RabbitMQ 版Exchange与Queue绑定的Binding Key。 需要提前在云消息队列 RabbitMQ 版控制台创建绑定关系。 具体操作,请参见创建绑定关系。 |
exchangeType | topic | Exchange的类型。云消息队列 RabbitMQ 版支持的类型如下,更多信息,请参见Exchange。
重要 请确保填写的Exchange类型和您创建Exchange时选择的类型一致。 |
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
//设置实例的接入点。
String hostName = "xxx.xxx.aliyuncs.com";
//设置实例的静态用户名密码。
String userName = "${UserName}";
String passWord = "${PassWord}";
//设置实例的Vhost。
String virtualHost = "${VirtualHost}";
//在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//设置Exchange、Queue和绑定关系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String bindingKey = "${BindingKey}";
//设置Exchange类型。
String exchangeType = "${ExchangeType}";
//此处为了体验流畅,确保了Exchange和Queue的创建过程。
//在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, bindingKey);
//开始发送消息。
for (int i = 0; i < 10; i++) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, bindingKey, true, props,
("消息发送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
}
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默认端口,非加密端口5672,加密端口5671。
factory.setPort(5671);
factory.useSslProtocol();
//基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
订阅消息
创建并编译运行ConsumerTest.java
。
编译运行ConsumerTest.java
订阅消息之前,您需要根据代码提示信息修改表1 配置参数列表中所列举的参数。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
//设置实例的接入点。
String hostName = "xxx.xxx.aliyuncs.com";
//设置实例的静态用户名密码。
String userName = "${UserName}";
String passWord = "${PassWord}";
//设置实例的Vhost。
String virtualHost = "${VirtualHost}";
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
final Channel channel = connection.createChannel();
//声明Queue。
String queueName = "${QueueName}";
//创建${QueueName} ,该Queue必须在云消息队列RabbitMQ版控制台上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
//开始消费消息。
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5671);
factory.useSslProtocol();
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
};
}