订阅关系(Subscription)

本文介绍云消息队列 RocketMQ 版中订阅关系(Subscription)的定义、模型关系、内部属性及使用建议。

定义

订阅关系是云消息队列 RocketMQ 版系统中消费者获取消息、处理消息的规则和状态配置。

订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。

通过配置订阅关系,可控制如下传输行为:

  • 订阅消息的集合:用于控制消费者订阅哪些消息,选择主题内的哪些消息进行消费。

  • 消费进度:云消息队列 RocketMQ 版服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。

TopicConsumerGroup的映射关系

一个Topic可以被多个ConsumerGroup订阅

一对多订阅能力,每个ConsumerGroupTopic的订阅相互独立,都可以订阅Topic下的全部或部分消息。

image.png

一个ConsumerGroup可以订阅多个Topic

如下图所示,消费者分组Group A订阅了两个主题Topic ATopic B,对于Group A中的消费者来说,订阅的Topic A为一个订阅关系,订阅的Topic B为另外一个订阅关系,且这两个订阅关系互相独立,可以各自定义,不受影响。

image.png

内部属性

消费模式

  • 定义:消费者对消息的消费模式

  • 取值:

    • CLUSTERING:用于订阅非Lite类型的其它类型Topic。消息会按照负载均衡规则,投递ConsumerGroup下的其中一个消费者。

    • LITE_SELECTIVE:用于订阅Lite类型的Topic,一个ConsumerGroup下的不同的消费者可以订阅不同的LiteTopic集合。

消息过滤属性

当消费模式为CLUSTERING时,可以使用消息过滤相关配置。

过滤类型

  • 定义:消息过滤规则的类型。订阅关系中设置消息过滤规则后,系统将按照过滤规则匹配主题中的消息,只将符合条件的消息投递给消费者消费,实现消息的再次分类。

  • 取值:

    • TAG过滤:按照Tag字符串进行全文过滤匹配。

    • SQL92过滤:按照SQL语法对消息属性进行过滤匹配。

过滤表达式

行为约束

消费模式为CLUSTERING

当消费模式为CLUSTERING时,同一个ConsumerGroup下的每个消费者的订阅关系、消费逻辑需要保持一致,否则会出现消费冲突,导致部分消息消费异常。

  • 正确示例

    //Consumer c1
    Consumer c1 = ConsumerBuilder.build(groupA);
    c1.subscribe(topicA,"TagA");
    //Consumer c2
    Consumer c2 = ConsumerBuilder.build(groupA);
    c2.subscribe(topicA,"TagA");
  • 错误示例

    //Consumer c1
    Consumer c1 = ConsumerBuilder.build(groupA);
    c1.subscribe(topicA,"TagA");
    //Consumer c2
    Consumer c2 = ConsumerBuilder.build(groupA);
    c2.subscribe(topicA,"TagB");

消费模式为LITE_SELECTIVE

当消费模式为LITE_SELECTIVE时,同一个ConsumerGroup下的不同的消费者可以订阅不同的LiteTopic集合。如下图所示,消费者1订阅了 LiteTopic1LiteTopic2,消费者2订阅了 LiteTopic3LiteTopic4。

每个消费者订阅LiteTopic集合可以实时地动态增加和删除。

image.png