订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系一致的正确示例代码以及订阅关系不一致的可能原因,帮助您顺畅地订阅消息。
背景信息
云消息队列 RocketMQ 版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。
由于云消息队列 RocketMQ 版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例需在以下方面均保持一致:
- 订阅的Topic必须一致,例如:Consumer1订阅TopicA和TopicB,Consumer2也必须订阅TopicA和TopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicA和TopicC。
- 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序,例如:Consumer1订阅TopicB且Tag为Tag1||Tag2,Consumer2订阅TopicB的Tag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1。

正确订阅关系一:订阅一个Topic且订阅一个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,且订阅TopicA的Tag也都是Tag1,符合订阅关系一致原则。

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
正确订阅关系二:订阅一个Topic且订阅多个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicB,订阅TopicB的Tag也都是Tag1和Tag2,表示订阅TopicB中所有Tag为Tag1或Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致性原则。

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
正确订阅关系三:订阅多个Topic且订阅多个Tag
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicB的Tag都是Tag1和Tag2,表示订阅TopicB中所有Tag为Tag1或Tag2的消息,且顺序一致都是Tag1||Tag2,符合订阅关系一致原则。

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
查看订阅关系一致性
您可在云消息队列 RocketMQ 版控制台Group 详情页面查看指定Group的订阅关系是否一致。具体操作,请参见查看消费者状态。若查询结果不一致,请参见常见订阅关系不一致问题排查Consumer实例的消费代码。
常见订阅关系不一致问题
- 错误示例一:同一Group ID下的Consumer实例订阅的Topic不同。
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别订阅了TopicA、TopicB和TopicC,订阅的Topic不一致,不符合订阅关系一致性原则。
错误示例代码一
- Consumer实例1-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例1-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例1-3:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicB", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例1-1:
- 错误示例二:同一Group ID下的Consumer实例订阅的Topic相同,但订阅Topic的Tag不同。
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,但是C1订阅TopicA的Tag为Tag1,C2和C3订阅的TopicA的Tag为Tag2,订阅同一Topic的Tag不一致,不符合订阅关系一致性原则。
错误示例代码二
- Consumer实例2-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag1", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例2-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag2", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例2-3:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag2", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例2-1:
- 错误示例三:同一Group ID下的Consumer实例订阅的Topic及Topic的Tag都相同,但订阅的Tag顺序不同。
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,并且订阅的TopicA都没有指定Tag,订阅TopicB的Tag都是Tag1和Tag2,但是C1订阅TopicB的Tag为Tag1||Tag2,C2和C3订阅的Tag为Tag2||Tag1,顺序不一致,不符合订阅关系一致性原则。
错误示例代码三
- Consumer实例3-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例3-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例3-3:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- Consumer实例3-1: