原因描述

在不同的JVM中启动了多个Consumer,并且给相同的Group ID配置了不同的Topic,或者是相同的Topic但Tag不同,最终导致订阅关系不一致,消息不符合预期。

错误代码示例

  • 错误示例一:同一个Group ID(GID-MQ-FAQ)订阅的Topic不同(分别是MQ-FAQ-TOPIC-1、MQ-FAQ-TOPIC-2)。

    JVM-1上的代码:

    
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();            

    JVM-2上的代码

    
    
    
         Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("MQ-FAQ-TOPIC-2", "NM-MQ-FAQ", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
    
        consumer.start();
  • 错误示例二:同一Group ID(GID-MQ-FAQ)订阅的Topic相同,但Tag不同(分别NM-MQ-FAQ-1、NM-MQ-FAQ-2)。

    JVM-1上的代码:

    
    
    
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ-1", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
                        

    JVM-2上的代码

    
    
    
         Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID-MQ-FAQ");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("MQ-FAQ-TOPIC-1", "NM-MQ-FAQ-2", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
    
        consumer.start();

建议解决方案

请确保在不同JVM中使用相同的Group ID启动多个Consumer时,配置的Topic和Tag是一致的。