如果您希望消息被投递后延迟一段时间被消费者消费,您可以使用消息队列RabbitMQ版的延时消息。消息队列RabbitMQ版原生支持延时消息,使用方式比开源RabbitMQ更简单。

什么是延时消息

延时消息是指在指定时间段之后才被消费者消费的消息。

应用场景

延时消息适用于以下场景:

  • 对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。

使用限制

  • 延时时间的值必须为非负整数。单位为毫秒。
  • 延时时间的最大值为86400000,即1天。若延时时间超过最大值,则当作普通消息处理。
  • 延时消息是消息队列RabbitMQ版的高级特性消息,此类消息将以普通消息价格的5倍计费。

    示例:一条延时消息,发布消息1次,订阅消息1次,按照5+5=10次API请求计费。

方案对比

消息队列RabbitMQ版和开源RabbitMQ支持的延时消息实现方案如下:

项目 开源RabbitMQ 消息队列RabbitMQ版
死信Exchange+Queue的消息存活时间 ✔️ ✔️
死信Exchange+消息的消息存活时间 ✔️ ✔️
注意 必须保证延时消息发布到的Queue有消费者。
rabbitmq-delayed-message-exchange插件 ✔️
原生延时消息方案(推荐) ✔️

原生延时消息方案

消息队列RabbitMQ版通过对消息设置delay来实现延时效果。消息队列RabbitMQ版原生延时消息的流转过程如下:
  1. 生产者向Exchange发布设置了delay的消息。
  2. Exchange将消息路由至Queue。
  3. 在设置的delay时间到期后,消费者才能从Queue消费消息。
注意 消息队列RabbitMQ版原生延时消息,则不要为Queue设置消息存活时间。如果您为Queue设置消息存活时间,则会导致延时消息失效。

原生延时消息最佳实践

  • 生产者客户端

    消息队列RabbitMQ版原生延时消息的使用方式非常简单。您只需要在生产者客户端发布消息时,通过delay为消息设置一个延时时间。

    发布延时消息的Java示例代码如下:

    Map<String, Object> headers = new HashMap<>();
    headers.put("delay", "xx");
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();

    更多语言示例代码,请参见AMQP Demos

  • 消费者客户端

    为保证延时消息时效性,建议您在消费消息时使用push模式的basic.consume方法,而不要使用pull模式的basic.get方法。因为消息队列RabbitMQ版的消息是分布式存储的,如果您使用pull模式的basic.get方法获取消息,并不能保证正好从存储的节点获取消息。

常见问题

  • 为什么发送延时消息后并没有出现延迟效果?

    因为设置了Queue的消息存活时间。Queue的消息存活时间和消息队列RabbitMQ版原生延时时间是冲突的。消息队列RabbitMQ版通过Queue实现延迟效果。如果您使用了Queue的消息存活时间,则消息队列RabbitMQ版默认不可同时使用原生延时消息。

  • 为什么实际的延时时间大于设置的延时时间?

    因为使用了pull模式的basic.get方法。消息队列RabbitMQ版的消息是集群存储的,使用pull模式的basic.get方法路由到一台消息队列RabbitMQ版Broker时,可能无法及时拉取存储在其他消息队列RabbitMQ版Broker上的消息。