背景知识
什么是订阅关系一致
订阅关系一致指的是消费者组(Group)内所有消费者订阅关系(Topic+TAG)必须保持一致,消费者可以订阅多个Topic或多个Tag,但其订阅关系必须一致。
详见订阅关系一致。
消费者如何订阅多个Topic与Tag
商业版JAVA SDK
maven坐标样例:
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>x.x.x.Final</version>示例代码:
Properties properties = new Properties();
// 您在消息队列RocketMQ版控制台创建的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/*
* 一些属性配置
*/
Consumer consumer = ONSFactory.createConsumer(properties);
//订阅多个Tag。
consumer.subscribe("TopicTestMQ1", "TagA||TagB", new MessageListener() {...});
//订阅多个Topic,同一个consumer实例继续调用subscribe增加订阅。
consumer.subscribe("TopicTestMQ2", "TagA||TagB", new MessageListener() {...});
社区版JAVA SDK(artifactId为rocketmq-client)
maven坐标样例:
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>示例代码:
// 您在消息队列RocketMQ版控制台创建的Group ID。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/*
* 一些属性配置
*/
//订阅多个Tag
consumer.subscribe("java-test", "TagA||TagB");
//订阅第二个Topic,需要注意consumer只能注册一个消费逻辑,即多个订阅的消费逻辑都是一样的
consumer.subscribe("cxtest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {...});社区版JAVA SDK(artifactId为rocketmq-client-java)
maven坐标样例:
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.8</version>示例代码:
final ClientServiceProvider provider = ClientServiceProvider.loadService();
/*
* 一些配置
*/
HashMap<String, FilterExpression> sub = new HashMap<>();
//订阅多个Tag
sub.put("Topic1", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
//订阅第二个topic,需要注意consumer只能注册一个消费逻辑,即多个订阅的消费逻辑都是一样的
sub.put("Topic2", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组,消息队列RocketMQ版控制台创建的Group ID。
.setConsumerGroup(consumerGroup)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(sub)
//设置消费监听器。
.setMessageListener(messageView -> {...})
.build();SpringBoot框架(使用RocketMQMessageListener注解)
若使用RocketMQMessageListener注解来启动消费者,需要留意每个注解都会启动一个独立的consumer实例,若代码中使用了多次注解,且每处注解订阅的Topic不同,就会导致订阅关系不一致。
比如,下面这段代码实现了两次RocketMQListener接口,在同一个Group中分别订阅了TopicA和TopicB,实际运行时客户端会创建两个consumer实例,最终导致在同一个消费者组内有两个消费者分别订阅TopicA和TopicB,导致订阅关系不一致。
// 业务逻辑1:TestMessageListener1.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicA")
public class TestMessageListener1 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
/*
* 一些配置
*/
}
}
// 业务逻辑2:TestMessageListener2.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicB")
public class TestMessageListener2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
/*
* 一些配置
*/
}
}若要实现多订阅,可通过重写RocketMQPushConsumerLifecycleListener接口的prepareStart方法来实现,参考代码如下:
@Component
@RocketMQMessageListener(consumerGroup = "GID_A", topic = "")
public class TestMessageListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt messageExt) {}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try{
// 订阅多个tag
defaultMQPushConsumer.subscribe("TopicA", "TagA||TagB");
// 订阅第二个topic
defaultMQPushConsumer.subscribe("TopicB", "*");
// 注册消费逻辑
defaultMQPushConsumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {......});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}SpringBoot框架(使用Bean注解)
此方式通过手动注入ConsumerBean,需要按照原生SDK的方式进行多订阅。
@Configuration
public class ConsumerClient {
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
//参考商业版以及社区版SDK多订阅的方式在此处注入对应的ConsumerBean
}
}场景描述
云消息队列 RocketMQ 版控制台提示订阅关系不一致。
简要排查思路
在控制台中找到对应的RocketMQ实例。
在Group详情页查看订阅关系不一致的客户端分布情况。
根据使用的SDK、框架修正为一致的订阅关系。
排查详情
首先确认使用的实例版本是5.0还是4.0,两者在控制台上显示订阅关系的样式略有区别。
RocketMQ 5.0实例
进入Group详情页,查看客户端以及对应的订阅关系列表。
订阅关系是以Topic为单位呈现的,过滤表达式一列显示了该Group内所有消费者订阅对应Topic的所有表达式。
为了快速排查具体是哪些消费者的订阅关系不一致,可以通过点开查看分布来详细获取消费者的订阅情况。点击后重点关注宿主机 IP/公网 IP一列,一般情况下
<IP>:<端口>可以唯一确定一个消费者实例(即初始化Consumer类之后的运行实体)。可以分为几种情况来分析:若页面上只显示了单个Topic,且该Topic内存在多个过滤表达式,说明不同的消费者对于该Topic设置的Tag不一致,此时只需点击右侧的查看分布,页面上会显示该Topic下所有消费者订阅情况,根据实际情况修改业务代码,确保所有消费者订阅的Tag保持一致即可。
若页面上显示了多个Topic,但每个Topic内只有一个过滤表达式,则说明有消费者订阅了多个Topic,此时只需点击每个Topic的查看分布信息,以宿主机 IP/公网 IP为维度梳理每个消费者的订阅情况。
若页面上显示了多个Topic,且每个Topic内存在多个过滤表达式,可点击查看分布,以宿主机 IP/公网 IP为维度重新整理消费者的订阅关系,以便以消费者的视角快速理清订阅关系。
梳理订阅关系举例
下图显示了两个Topic:
my-topic和my-topic2,前者包含两个过滤表达式,说明至少有一个消费者分别订阅了my-topic + TagA、my-topic + TagB。
点击my-topic的查看分布,可以看到如下内容,其中显示了有三个客户端订阅了三种过滤表达式。

点击my-topic2的查看分布,显示其中有两个客户端订阅,特别留意
140.x.11.235:17713在my-topic的分布中也出现过,说明这个客户端同时订阅了这两个Topic。
经过整理,可以得到以消费者客户端视角的订阅关系情况:
客户端
订阅关系
140.x.11.235:47913
my-topic TagB
140.x.11.235:17713
my-topic TagA
my-topic2 TagA
140.x.11.235:11665
my-topic TagA
140.x.11.235:5629
my-topic2 TagA
自此消费者的订阅关系整理完毕,根据业务实际情况调整,确保所有消费者的订阅关系保持一致即可。
RocketMQ 4.0实例
进入Group详情页,在订阅关系区域会以客户端为单位,显示该Group内的订阅情况。
