全部产品
云市场

订阅关系一致

更新时间:2019-09-25 22:07:24

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系不一致的示例代码,帮助您顺畅地订阅消息。

消息队列 MQ 里的一个消费者 Group ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个 Consumer 实例。

由于消息队列 MQ 的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致

  • 订阅的 Topic 中的 Tag 必须一致

订阅关系图片示例

正确订阅关系图片示例

多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者实例的订阅关系保持了一致。

订阅关系一致

错误订阅关系图片示例

单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者实例的订阅关系并没有保持一致。

订阅关系不一致

订阅关系代码示例

错误订阅关系代码示例

  • 例一

    以下例子中,同一个 Group ID 下的两个实例订阅的 Topic 不一致。

    • Consumer 实例 1-1:

      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });
    • Consumer 实例 1-2:

      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });
  • 例二

    以下例子中,同一个 Group ID 下订阅 Topic 的 Tag 不一致。Consumer 实例 2-1 订阅了 TagA,而 Consumer 实例 2-2 未指定 Tag。

    • Consumer 实例 2-1:

      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });
    • Consumer 实例 2-2:

      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });
  • 例三

    此例中,错误的原因如下所述:

    1. 同一个 Group ID 下订阅 Topic 个数不一致。

    2. 同一个 Group ID 下订阅 Topic 的 Tag 不一致。

    • Consumer 实例 3-1:
      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });
      10. consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
      11. public Action consume(Message message, ConsumeContext context) {
      12. System.out.println(message.getMsgID());
      13. return Action.CommitMessage;
      14. }
      15. });
    • Consumer 实例 3-2:
      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
      3. Consumer consumer = ONSFactory.createConsumer(properties);
      4. consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
      5. public Action consume(Message message, ConsumeContext context) {
      6. System.out.println(message.getMsgID());
      7. return Action.CommitMessage;
      8. }
      9. });