订阅关系一致

订阅关系一致指的是同一个消费者分组Group ID下,所有Consumer实例所订阅的TopicTag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

什么是订阅关系一致?

云消息队列 RocketMQ 版里的一个消费者分组Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个Group ID下通常会挂载多个Consumer实例。

云消息队列 RocketMQ 版的订阅关系指的是指定某个消费者分组Group ID对于某个主题Topic的订阅,包括订阅Topic时的过滤条件。

因此,订阅关系一致性的范围是同一Group ID下所有消费者对于指定Topic的订阅都相同。其中包括:

  • 订阅的Topic必须一致。

    例如:Consumer1订阅TopicATopicB,Consumer2也必须订阅TopicATopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicATopicC。

  • 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序。

    例如:Consumer1订阅TopicBTagTag1||Tag2,Consumer2订阅TopicBTag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1

  • 订阅多个TopicTopic的类型一致。

    例如,Consumer1Consumer2都同时订阅TopicATopicB,则这两个Topic的类型必须一致,必须都是普通消息或者都是顺序消息。云消息队列 RocketMQ 版支持的消息类型,请参见消息类型列表

正确的订阅关系如下,多个Group ID分别订阅了不同的Topic,但是同一个Group ID下的多个Consumer实例C1、C2、C3订阅的TopicTag都一致。正确订阅关系

重要

云消息队列 RocketMQ 版支持使用TCP协议和HTTP协议的SDK客户端收发消息,除了保证同一Group ID下的Consumer实例订阅关系一致,还必须保证订阅消息的Group ID的协议版本和SDK的协议版本一致,例如,使用TCP协议的SDK收发消息,订阅消息时也必须使用创建的TCP协议的Group ID,否则会导致消息消费失败。

如何查看订阅关系是否一致?

操作入口如下:

  1. 登录云消息队列 RocketMQ 版控制台,在实例列表中单击目标实例名称,进入实例详情页面。

  2. 在左侧导航栏单击Group 管理,然后在Group列表单击目标Group名称。

  3. Group 详情页面的订阅关系区域查看指定Group的订阅关系是否一致。

正确订阅关系一:订阅一个Topic且订阅一个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别都订阅了TopicA,且订阅TopicATag也都是Tag1,符合订阅关系一致原则。

正确示例1正确示例代码一

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正确订阅关系二:订阅一个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别都订阅了TopicB,订阅TopicBTag也都是Tag1Tag2,表示订阅TopicB中所有TagTag1Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致性原则。

正确示例2正确示例代码二

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正确订阅关系三:订阅多个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别都订阅了TopicATopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicBTag都是Tag1Tag2,表示订阅TopicB中所有TagTag1Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致原则。

正确示例3正确示例代码三

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

常见订阅关系不一致场景

使用云消息队列 RocketMQ 版收发消息时,Consumer收到的消息不符合预期并且在云消息队列 RocketMQ 版控制台查看到订阅关系不一致,则Consumer实例可能存在以下问题:

错误示例一:同一Group ID下的Consumer实例订阅的Topic不同

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别订阅了TopicA、TopicBTopicC,订阅的Topic不一致,不符合订阅关系一致性原则。

错误示例1

错误示例代码一

  • Consumer实例1-1:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicA", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer实例1-2:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer实例1-3:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicB", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

错误示例二:同一Group ID下的Consumer实例订阅的Topic相同,但订阅TopicTag不同

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别都订阅了TopicA,但是C1订阅TopicATagTag1,C2C3订阅的TopicATagTag2,订阅同一TopicTag不一致,不符合订阅关系一致性原则。

错误示例2

错误示例代码二

  • Consumer实例2-1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer实例2-2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer实例2-3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

错误示例三:同一Group ID下的Consumer实例订阅的TopicTopicTag都相同,但订阅的Tag顺序不同

如下图所示,同一Group ID下的三个Consumer实例C1、C2C3分别都订阅了TopicATopicB,并且订阅的TopicA都没有指定Tag,订阅TopicBTag都是Tag1Tag2,但是C1订阅TopicBTagTag1||Tag2,C2C3订阅的TagTag2||Tag1,顺序不一致,不符合订阅关系一致性原则。

错误示例3

错误示例代码三

  • Consumer实例3-1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer实例3-2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer实例3-3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

订阅关系常见问题

一个Group ID是否可以和多个Topic存在订阅关系?

支持,一个Group可以订阅多个Topic,该Group对于指定一个Topic的订阅算1个订阅关系,您需要确保每一个订阅关系中,Group下所有Consumer对于指定Topic的订阅都一致。

订阅关系是否支持手动删除?

订阅关系需要在消费者客户端代码中设置。