消息队列RocketMQ版支持消息重试功能,即Consumer消费某条消息失败后,消息队列RocketMQ版会根据消息重试机制重新投递消息。TCP协议和HTTP协议的重试机制有所差异,本文介绍消息队列RocketMQ版分别在HTTP协议和TCP协议下的消息重试机制和配置方式。

消息重试机制概述

消息队列RocketMQ版消息收发过程中,若Consumer消费某条消息失败,则消息队列RocketMQ版会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

  • 重试间隔:消息消费失败后再次被消息队列RocketMQ版投递给Consumer消费的间隔时间。
  • 最大重试次数:消息消费失败后,可被消息队列RocketMQ版重复投递的最大次数。
说明 一条消息无论重试多少次,这些重试消息的Message ID不会改变。
消息队列RocketMQ版中,顺序消息和无序消息(包括普通消息、延时消息、定时消息和事务消息)的重试机制如下:
协议 消息类型 重试间隔 最大重试次数 配置方式
TCP协议 顺序消息 间隔时间可通过自定义参数suspendTimeMillis取值进行配置。参数取值范围:10~30000,单位:毫秒,默认值:1000毫秒,即1秒。 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX 请参见配置方式(TCP协议)
无序消息 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置。
  • 若最大重试次数小于等于16次,则间隔时间按照无序消息重试间隔时间阶梯变化。
  • 若最大重试次数大于16次,则超过16次的间隔时间均为2小时。
最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。
HTTP协议 顺序消息 1分钟 288次 不涉及
无序消息 5分钟 288次 不涉及

TCP协议消息重试

顺序消息

对于顺序消息,当Consumer消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试,最大重试次数以MaxReconsumeTimes参数取值为准,若参数未设置,则默认最大次数为Integer.MAX。每次重试间隔时间以suspendTimeMillis参数值为准,默认为1秒。
  • 若设置了MaxReconsumeTimes参数:消息重试次数超过了参数值后将不再重试,直接被投递至死信队列。
  • 若未设置MaxReconsumeTimes参数:最大重试次数为Integer.MAX,消息将会无限次重试,直到消费成功。

无序消息

注意 无序消息的重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
对于无序消息,当Consumer消费消息失败时,消息队列RocketMQ版会不断进行消息重试,最大重试次数以MaxReconsumeTimes参数取值为准,默认值为16次,即允许每条消息最多重试16次,如果消息重试16次后仍然失败,消息将被投递至死信队列。消息重试间隔时间如下表所示,若重试次数设置为大于16次,则超过16次的间隔时间均为2小时。
表 1. 无序消息重试间隔时间
第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

如果严格按照上述重试时间间隔计算且重试次数默认为16次,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试。超过这个时间范围消息将不再重试投递,而被投递至死信队列。

HTTP协议消息重试

顺序消息

对于顺序消息,当Consumer消费消息失败后,消息队列RocketMQ版会不断进行消息重试,每次重试间隔时间为1分钟,最多重试288次。如果消息重试288次后仍然失败,消息将被投递至死信队列。

无序消息

对于无序消息,当Consumer消费消息失败后,消息队列RocketMQ版会不断进行消息重试,每次重试间隔时间为5分钟,最多重试288次。如果消息重试288次后仍然失败,消息将被投递至死信队列。

配置方式(TCP协议)

注意 以下配置方式仅适用于TCP协议,HTTP协议不涉及。
  • 消息投递失败后需要重试

    集群消费模式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

    • 方式1:返回Action.ReconsumeLater(推荐)
    • 方式2:返回Null
    • 方式3:抛出异常

    示例代码

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            //消息处理逻辑抛出异常,消息将重试。
            doConsumeMessage(message);
            //方式1:返回Action.ReconsumeLater,消息将重试。
            return Action.ReconsumeLater;
            //方式2:返回null,消息将重试。
            return null;
            //方式3:直接抛出异常,消息将重试。
            throw new RuntimeException("Consumer Message exception");
        }
    }
  • 消费投递失败后无需重试

    集群消费模式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。

    示例代码

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                doConsumeMessage(message);
            } catch (Throwable e) {
                //捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
                return Action.CommitMessage;
            }
            //消息处理正常,直接返回Action.CommitMessage;
            return Action.CommitMessage;
        }
    }
  • 自定义消息最大重试次数和重试间隔
    说明 自定义消息队列RocketMQ版的客户端日志配置,请升级TCP Java SDK到1.2.2或以上版本。更多信息,请参见版本说明

    消息队列RocketMQ版允许Consumer实例启动的时候设置最大重试次数和重试间隔,无序消息重试间隔时间不支持自定义,以无序消息重试间隔时间为准。

    配置方式如下:

    Properties properties = new Properties();
    //配置对应Group ID的最大消息重试次数为20次,最大重试次数为字符串类型。
    properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
    //配置对应Group ID的消息重试间隔时间为3000毫秒,重试间隔时间为字符串类型。
    properties.put(PropertyKeyConst.suspendTimeMillis,"3000");
    Consumer consumer = ONSFactory.createConsumer(properties);
    注意 配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前启动的实例的配置。因此,请确保同一Group ID下的所有Consumer实例设置的最大重试次数和重试间隔相同,否则各实例间的配置将会互相覆盖。
  • 获取消息重试次数

    Consumer收到消息后,可按照以下方式获取消息的重试次数,消息重试间隔时间一般不需要获取。

    public class MessageListenerImpl implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            //获取消息的重试次数。
            System.out.println(message.getReconsumeTimes());
            return Action.CommitMessage;
        }
    }