延时消息
如果您希望消息被投递后延迟一段时间被消费者消费,您可以使用云消息队列 RabbitMQ 版的延时消息。云消息队列 RabbitMQ 版原生支持延时消息,使用方式比开源RabbitMQ更简单。
什么是延时消息
延时消息是指在指定时间段之后才被消费者消费的消息。
应用场景
延时消息适用于以下场景:
对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。
延时时间设置规则
方案对比
云消息队列 RabbitMQ 版无需进行代码改造,即可实现开源RabbitMQ的所有延时消息方案,详情请参见下表:
项目 | 开源RabbitMQ | 云消息队列 RabbitMQ 版 |
死信Exchange+Queue的消息存活时间 | 支持 | 支持 |
死信Exchange+消息的消息存活时间 | 支持 | 支持 |
支持 | 支持 | |
不支持 | 支持 |
开源延时消息插件方案
为了减少与开源RabbitMQ的差别,云消息队列 RabbitMQ 版也基于原生的延时消息支持使用开源插件式的方式来使用延时消息,并免去插件的安装。具体使用流程如下:
声明
x-delayed-message
类型的Exchange,并填写该Exchange的扩展参数x-delayed-type以指定Exchange的路由类型。示例如下:Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);
参数说明如下:
参数
说明
x-delayed-type
Exchange的类型,指定路由规则。取值说明如下:
direct
fanout
topic
headers
ExchangeName
Exchange的名称。
说明请确保声明的Exchange已存在。具体步骤,请参见Exchange管理。
x-delayed-message
指定Exchange类型,以支持投递延时消息。
发送延时消息。在消息的Header属性中增加一个键为x-delay,值为毫秒数的键值对,并且指定发送的目标Exchange为上一步已声明的Exchange。示例如下:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000);//表示消息延时5000毫秒。 AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);
原生延时消息方案
云消息队列 RabbitMQ 版通过对消息设置delay来实现延时效果。云消息队列 RabbitMQ 版原生延时消息的流转过程如下:
生产者向Exchange发布设置了delay属性的消息。
Exchange将消息路由至Queue。
在设置的delay时间到期后,消费者才能从Queue消费消息。
原生延时消息最佳实践
生产者客户端
云消息队列 RabbitMQ 版原生延时消息的使用方式非常简单。您只需要在生产者客户端发布消息时,通过delay为消息设置一个延时时间。
发布延时消息的Java示例代码如下:
Map<String, Object> headers = new HashMap<>(); headers.put("delay", "5000");//表示消息延时5000毫秒。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
更多语言示例代码,请参见AMQP Demos。
消费者客户端
为保证延时消息时效性,建议您在消费消息时使用push模式的
basic.consume
方法,而不要使用pull模式的basic.get
方法。因为云消息队列 RabbitMQ 版的消息是分布式存储的,如果您使用pull模式的basic.get
方法获取消息,并不能保证正好从存储的节点获取消息。
常见问题
为什么实际的延时时间大于设置的延时时间?
因为客户端使用了pull模式的basic.get方法消费消息。云消息队列 RabbitMQ 版的消息是集群存储的,使用pull模式的basic.get方法路由到一台Broker时,可能无法及时拉取存储在其他Broker上的消息。