订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系一致的正确示例代码以及订阅关系不一致的可能原因,帮助您顺畅地订阅消息。

背景信息

消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。

由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例需在以下方面均保持一致:

  • 订阅的Topic必须一致,例如:Consumer1订阅TopicA和TopicB,Consumer2也必须订阅TopicA和TopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicA和TopicC。
  • 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序,例如:Consumer1订阅TopicB且Tag为Tag1||Tag2,Consumer2订阅TopicB的Tag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1
正确的订阅关系如下,多个Group ID分别订阅了不同的Topic,但是同一个Group ID下的多个Consumer实例C1、C2、C3订阅的Topic和Tag都一致。正确订阅关系
注意 消息队列RocketMQ版支持使用TCP协议和HTTP协议的SDK客户端收发消息,除了保证同一Group ID下的Consumer实例订阅关系一致,还必须保证订阅消息的Group ID的协议版本和SDK的协议版本一致,例如,使用TCP协议的SDK收发消息,订阅消息时也必须使用创建的TCP协议的Group ID,否则会导致消息消费失败。

正确订阅关系一:订阅一个Topic且订阅一个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,且订阅TopicA的Tag也都是Tag1,符合订阅关系一致原则。

正确示例1正确示例代码一
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正确订阅关系二:订阅一个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicB,订阅TopicB的Tag也都是Tag1Tag2,表示订阅TopicB中所有Tag为Tag1Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致性原则。

正确示例2正确示例代码二
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正确订阅关系三:订阅多个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicB的Tag都是Tag1Tag2,表示订阅TopicB中所有Tag为Tag1Tag2的消息,且顺序一致都是Tag||Tag2,符合订阅关系一致原则。

正确示例3正确示例代码三
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

查看订阅关系一致性

您可在消息消息队列RocketMQ版控制台Group 详情页面查看指定Group的订阅关系是否一致。具体操作,请参见查看消费者状态。若查询结果不一致,请参见常见订阅关系不一致问题排查Consumer实例的消费代码。

常见订阅关系不一致问题

使用消息队列RocketMQ版收发消息时,Consumer收到的消息不符合预期并且在消息队列RocketMQ版控制台查看到订阅关系不一致,则Consumer实例可能存在以下问题:
  • 错误示例一:同一Group ID下的Consumer实例订阅的Topic不同。

    如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别订阅了TopicA、TopicB和TopicC,订阅的Topic不一致,不符合订阅关系一致性原则。

    错误示例1错误示例代码一
    • Consumer实例1-1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer实例1-2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicC", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer实例1-3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicB", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
  • 错误示例二:同一Group ID下的Consumer实例订阅的Topic相同,但订阅Topic的Tag不同。

    如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,但是C1订阅TopicA的Tag为Tag1,C2和C3订阅的TopicA的Tag为Tag2,订阅同一Topic的Tag不一致,不符合订阅关系一致性原则。

    错误示例2

    错误示例代码二

    • Consumer实例2-1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                    
    • Consumer实例2-2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
    • Consumer实例2-3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
  • 错误示例三:同一Group ID下的Consumer实例订阅的Topic及Topic的Tag都相同,但订阅的Tag顺序不同。

    如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,并且订阅的TopicA都没有指定Tag,订阅TopicB的Tag都是Tag1Tag2,但是C1订阅TopicB的Tag为Tag1||Tag2,C2和C3订阅的Tag为Tag2||Tag1,顺序不一致,不符合订阅关系一致性原则。

    错误示例3

    错误示例代码三

    • Consumer实例3-1:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });   
          consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                         
    • Consumer实例3-2:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
             public Action consume(Message message, ConsumeContext context) {
                 System.out.println(message.getMsgID());
                 return Action.CommitMessage;
             }
          }); 
          consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });                   
    • Consumer实例3-3:
          Properties properties = new Properties();
          properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
          Consumer consumer = ONSFactory.createConsumer(properties);
          consumer.subscribe("TopicA", "*", new MessageListener() {
             public Action consume(Message message, ConsumeContext context) {
                 System.out.println(message.getMsgID());
                 return Action.CommitMessage;
             }
          }); 
          consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
              public Action consume(Message message, ConsumeContext context) {
                  System.out.println(message.getMsgID());
                  return Action.CommitMessage;
              }
          });