消息队列RocketMQ版里的一个Group ID代表一个Consumer实例群组。对大多数分布式应用来说,一个Group ID下通常会挂载多个Consumer实例。订阅关系一致指,同一个Group ID下所有的Consumer实例的Topic和Tag都必须完全一致。

问题描述

  • Group ID订阅的部分消息未收到。登录消息队列RocketMQ版控制台,单击消息查询 > 按Message ID查询消息,消息显示已至少被消费—次,但从消费逻辑判断未消费。
  • 登录消息队列RocketMQ版控制台,单击Group管理 > 消费者状态订阅关系是否一致栏显示

问题分析

若同一个Group ID下的Consumer实例配置了不同的Topic,或是Topic相同但Tag不同 ,订阅关系就会不一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,导致消息丢失。

  • 可能原因一

    配置了相同Group ID的不同客户端订阅的Topic不一致。

    同一个Group ID(GID-MQ-FAQ)订阅的Topic不同(分别是MQ-FAQ-TOPIC-1、MQ-FAQ-TOPIC-2)。

    错误示例:JVM-1上的代码

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            return Action.CommitMessage;
        }
    });
    consumer.start();                   

    错误示例:JVM-2上的代码

     Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("MQ-FAQ-TOPIC-2", "NM-MQ-FAQ", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            return Action.CommitMessage;
        }
    });
    
    consumer.start();                   
  • 可能原因二

    同一个Group ID下的不同客户端订阅的Topic相同,但Tag不同。

    同一个Group ID(GID-MQ-FAQ)订阅的Tag不同(分别是NM-MQ-FAQ1、NM-MQ-FAQ2)。

    错误示例:JVM-1上的代码

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            return Action.CommitMessage;
        }
    });
    consumer.start();                    

    错误示例:JVM-2上的代码

     Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            return Action.CommitMessage;
        }
    });
    
    consumer.start();                    

解决方案

  1. 检查不同客户端的订阅代码。保持配置相同Group ID的所有客户端的订阅关系—致,包括订阅的Topic与Tag。
  2. 重启所有客户端应用。

结果验证

  • 重启所有客户端应用。
  • 登录消息队列RocketMQ版控制台,单击Group管理 > 消费者状态订阅关系是否一致栏显示