Spring集成
更新时间:
云消息队列 RabbitMQ 版支持Spring框架的SDK,本文介绍如何集成Spring框架的SDK客户端收发消息。
前提条件
您已在云消息队列 RabbitMQ 版控制台创建实例、Vhost、Exchange、Queue等相关资源,具体操作,请参见步骤二:创建资源。
Demo工程示例
单击SpringBootDemo下载Demo工程示例。
步骤一:参数配置
在application.properties
或application.yml
文件中填写配置参数,以application.properties为例。
# 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情⻚面获取。
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# 云消息队列RabbitMQ连接端口。
spring.rabbitmq.port=5672
# 指定实例的静态用户名,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。
spring.rabbitmq.username=******
# 指定实例的静态用户密码,在云消息队列 RabbitMQ 版控制台静态用户名密码管理⻚面查看。
spring.rabbitmq.password=******
# 虚拟主机,提供逻辑隔离,在云消息队列 RabbitMQ 版控制台Vhost列表⻚面查看。
spring.rabbitmq.virtual-host=test_vhost
# 消息确认(Ack)模式。
# 1. none:消费者接收消息后,无论消费成功与否,服务端均认为消息已成功处理,即RabbitMQ中的autoAck模式。
# 2. auto:客户端在消费成功后主动回复ack(处理失败则回复nack或抛出异常),不需要客户端显式调用Channel.basicAck()。
# 3. manual: 手动回复Ack,需要客户端在消费成功后显式调用Channel.basicAck()主动回复。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 一个消费者最多可处理的未被确认(Ack)消息数量(QoS),RabbitMQ服务端取min{prefetch, 100}作为QoS。
spring.rabbitmq.listener.simple.prefetch=10
# RabbitMQ监听器(Listener)的最小并发消费者数量。
spring.rabbitmq.listener.simple.concurrency=2
# RabbitMQ监听器(Listener)的最大并发消费者数量,当消费速率足够快时,客户端会启动max-concurrency个消费者进行消费。
spring.rabbitmq.listener.simple.max-concurrency=5
以下其他配置您可以根据需要选择添加:
步骤二:调用SDK收发消息
创建连接
推荐缓存模式:
客户端和服务端建立连接时,推荐将缓存模式设置为CONNECTION模式。
该模式下支持创建多个Connection,程序会缓存一定数量的Connection,每个Connection中缓存一定的Channel。
云消息队列 RabbitMQ 版是集群分布式架构,在CONNECTION模式下,创建多个connection可以帮助客户端更好地和集群的多个服务节点连接,更高效地发送和消费消息。
参数设置如下:
// 缓存模式设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);
完整的创建连接的代码示例如下:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory()
{
// 初始化RabbitMQ连接配置connectionFactory。
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// VirtualHost可以在RabbitMQ控制台手动创建,也可以在这里自动创建。
connectionFactory.setVirtualHost(virtualHost);
// 请务必开启Connection自动重连功能,保证服务端发布时客户端可自动重新连接上服务端。
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
// 缓存模式推荐设置为CONNECTION。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION模式下,最大可缓存的connection数量。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION模式下,最大可缓存的Channel数量。
connectionFactory.setChannelCacheSize(64);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
// RabbitMQ消息模板,该模板封装了多种常用的消息操作。
return new RabbitTemplate(connectionFactory);
}
}
消息生产
在RabbitMQService中通过依赖注入的方式获取RabbitTemplate,并调用其提供的send接口发送消息。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// 设置MessageId。
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// 创建Message。
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* 发送消息send()接口;
* exchange:交换机名称;
* routingKey:路由键;
* message:消息内容;
* correlationData用于关联发布者确认;
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}
消息消费
使用@RabbitListener
注解消费消息:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* 消息接收;
* @param message消息;
* @param channel通道;
* @throws IOException
*queues需要替换为您提前创建好的队列的名称;
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// 进入消息消费业务逻辑。
...
// 须在Ack有效期(消费超时时间)内返回Ack,否则为无效确认,消息还会被重复投递。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
RabbitListener
常见可选配置如下:
文档内容是否对您有帮助?