如何排查RocketMQ订阅关系不一致的问题?

背景知识

什么是订阅关系一致

订阅关系一致指的是消费者组(Group)内所有消费者订阅关系(Topic+TAG)必须保持一致,消费者可以订阅多个Topic或多个Tag,但其订阅关系必须一致。

详见订阅关系一致

消费者如何订阅多个TopicTag

商业版JAVA SDK

maven坐标样例:

<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>x.x.x.Final</version>

示例代码:

Properties properties = new Properties();
// 您在消息队列RocketMQ版控制台创建的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/*
* 一些属性配置
*/
Consumer consumer = ONSFactory.createConsumer(properties);
//订阅多个Tag。
consumer.subscribe("TopicTestMQ1", "TagA||TagB", new MessageListener() {...});
//订阅多个Topic,同一个consumer实例继续调用subscribe增加订阅。
consumer.subscribe("TopicTestMQ2", "TagA||TagB", new MessageListener() {...});

社区版JAVA SDK(artifactIdrocketmq-client)

maven坐标样例:

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>

示例代码:

// 您在消息队列RocketMQ版控制台创建的Group ID。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/*
* 一些属性配置
*/
//订阅多个Tag
consumer.subscribe("java-test", "TagA||TagB");
//订阅第二个Topic,需要注意consumer只能注册一个消费逻辑,即多个订阅的消费逻辑都是一样的
consumer.subscribe("cxtest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {...});

社区版JAVA SDK(artifactIdrocketmq-client-java)

maven坐标样例:

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.8</version>

示例代码:

final ClientServiceProvider provider = ClientServiceProvider.loadService();
/*
* 一些配置
*/
HashMap<String, FilterExpression> sub = new HashMap<>();
//订阅多个Tag
sub.put("Topic1", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
//订阅第二个topic,需要注意consumer只能注册一个消费逻辑,即多个订阅的消费逻辑都是一样的
sub.put("Topic2", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组,消息队列RocketMQ版控制台创建的Group ID。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(sub)
                //设置消费监听器。
                .setMessageListener(messageView -> {...})
                .build();

SpringBoot框架(使用RocketMQMessageListener注解)

若使用RocketMQMessageListener注解来启动消费者,需要留意每个注解都会启动一个独立的consumer实例,若代码中使用了多次注解,且每处注解订阅的Topic不同,就会导致订阅关系不一致。

比如,下面这段代码实现了两次RocketMQListener接口,在同一个Group中分别订阅了TopicATopicB,实际运行时客户端会创建两个consumer实例,最终导致在同一个消费者组内有两个消费者分别订阅TopicATopicB,导致订阅关系不一致。

// 业务逻辑1:TestMessageListener1.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicA")
public class TestMessageListener1 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        /*
        * 一些配置
        */
    }
}
// 业务逻辑2:TestMessageListener2.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicB")
public class TestMessageListener2 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        /*
        * 一些配置
        */
    }
}

若要实现多订阅,可通过重写RocketMQPushConsumerLifecycleListener接口的prepareStart方法来实现,参考代码如下:

@Component
@RocketMQMessageListener(consumerGroup = "GID_A", topic = "")
public class TestMessageListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt messageExt) {}
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try{
            // 订阅多个tag
            defaultMQPushConsumer.subscribe("TopicA", "TagA||TagB");
            // 订阅第二个topic
            defaultMQPushConsumer.subscribe("TopicB", "*");
            // 注册消费逻辑
            defaultMQPushConsumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {......});
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

SpringBoot框架(使用Bean注解)

此方式通过手动注入ConsumerBean,需要按照原生SDK的方式进行多订阅。

@Configuration
public class ConsumerClient {
  
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        //参考商业版以及社区版SDK多订阅的方式在此处注入对应的ConsumerBean
    }
}

场景描述

云消息队列 RocketMQ 版控制台提示订阅关系不一致。

简要排查思路

  1. 在控制台中找到对应的RocketMQ实例。

  2. Group详情页查看订阅关系不一致的客户端分布情况。

  3. 根据使用的SDK、框架修正为一致的订阅关系。

排查详情

首先确认使用的实例版本是5.0还是4.0,两者在控制台上显示订阅关系的样式略有区别。

RocketMQ 5.0实例

  1. 进入Group详情页,查看客户端以及对应的订阅关系列表。

  2. 订阅关系是以Topic为单位呈现的,过滤表达式一列显示了该Group内所有消费者订阅对应Topic的所有表达式。

  3. 为了快速排查具体是哪些消费者的订阅关系不一致,可以通过点开查看分布来详细获取消费者的订阅情况。点击后重点关注宿主机 IP/公网 IP一列,一般情况下<IP>:<端口>可以唯一确定一个消费者实例(即初始化Consumer类之后的运行实体)。可以分为几种情况来分析:

    1. 若页面上只显示了单个Topic,且该Topic内存在多个过滤表达式,说明不同的消费者对于该Topic设置的Tag不一致,此时只需点击右侧的查看分布,页面上会显示该Topic下所有消费者订阅情况,根据实际情况修改业务代码,确保所有消费者订阅的Tag保持一致即可。

    2. 若页面上显示了多个Topic,但每个Topic内只有一个过滤表达式,则说明有消费者订阅了多个Topic,此时只需点击每个Topic查看分布信息,以宿主机 IP/公网 IP为维度梳理每个消费者的订阅情况。

    3. 若页面上显示了多个Topic,且每个Topic内存在多个过滤表达式,可点击查看分布,以宿主机 IP/公网 IP为维度重新整理消费者的订阅关系,以便以消费者的视角快速理清订阅关系。

梳理订阅关系举例

  1. 下图显示了两个Topic:my-topicmy-topic2,前者包含两个过滤表达式,说明至少有一个消费者分别订阅了my-topic + TagAmy-topic + TagB

    image

  2. 点击my-topic查看分布,可以看到如下内容,其中显示了有三个客户端订阅了三种过滤表达式。

    image

  3. 点击my-topic2查看分布,显示其中有两个客户端订阅,特别留意140.x.11.235:17713my-topic的分布中也出现过,说明这个客户端同时订阅了这两个Topic。

    image

  4. 经过整理,可以得到以消费者客户端视角的订阅关系情况:

    客户端

    订阅关系

    140.x.11.235:47913

    my-topic TagB

    140.x.11.235:17713

    my-topic TagA

    my-topic2 TagA

    140.x.11.235:11665

    my-topic TagA

    140.x.11.235:5629

    my-topic2 TagA

自此消费者的订阅关系整理完毕,根据业务实际情况调整,确保所有消费者的订阅关系保持一致即可。

RocketMQ 4.0实例

进入Group详情页,在订阅关系区域会以客户端为单位,显示该Group内的订阅情况。

image