全部产品
云市场

订阅关系一致

更新时间:2020-01-14 16:30:22

订阅关系一致指的是同一个消费者 Group ID 下所有消费者的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系不一致的示例代码,帮助您顺畅地订阅消息。

背景信息

SOFAStack 消息队列里的一个消费者 Group ID 代表一个消费者群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个消费者。

由于消息队列的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的消费者需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致
  • 订阅的 Topic 中的 Tag 必须一致

正确订阅关系图片示例

多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者的订阅关系保持了一致。

正确订阅关系

错误订阅关系图片示例

单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者的订阅关系并没有保持一致。

错误订阅关系

错误订阅关系代码示例一

以下例子中,同一个 Group ID 下的两个消费者订阅的 Topic 不一致。

  • 消费者 1-1:
    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • 消费者 1-2:
    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });

错误订阅关系代码示例二

以下例子中,同一个 Group ID 下订阅 Topic 的 Tag 不一致。消费者 2-1 订阅了 TagA,而消费者 2-2 未指定 Tag。

  • 消费者 2-1:
    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • 消费者 2-2:
    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });

错误订阅关系代码示例三

此例中,错误的原因如下所述:

  • 同一个 Group ID 下订阅 Topic 个数不一致。
  • 同一个 Group ID 下订阅 Topic 的 Tag 不一致。

  • 消费者 3-1:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
    10. consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
    11. public Action consume(Message message, ConsumeContext context) {
    12. System.out.println(message.getMsgID());
    13. return Action.CommitMessage;
    14. }
    15. });
  • 消费者 3-2:
    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    3. Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
    4. consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });