全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

订阅关系一致

更新时间:2018-07-12 17:04:32

MQ 里的一个 Consumer ID 代表一个 Consumer 实例群组。 对于大多数分布式应用来说,一个 Consumer ID 下通常会挂载多个 Consumer 实例。 订阅关系一致指的是同一个 Consumer ID 下所有 Consumer 实例的处理逻辑必须完全一致。 一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer ID 下所有的实例需在以下两方面均保持一致:

  1. 订阅的 Topic 必须一致;

  2. 订阅的 Topic 中的 Tag 必须一致。

订阅关系图片示例

正确订阅关系图片示例

在下图中,多个 Consumer ID 订阅了多个 Topic,并且每个 Consumer ID 里的多个消费者实例的订阅关系保持了一致。

订阅关系一致

错误订阅关系图片示例

在下图中,单个 Consumer ID 订阅了多个 Topic,但是该 Consumer ID 里的多个消费者实例的订阅关系并没有保持一致。

订阅关系不一致

订阅关系代码示例

错误订阅关系代码示例

【例一】

以下例子中,同一个 Consumer ID 下的两个实例订阅的 Topic 不一致。

Consumer 实例 1-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer 实例1-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

【例二】

以下例子中,同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。 Consumer 实例2-1 订阅了 TagA,而 Consumer 实例2-2 未指定 Tag。

Consumer 实例2-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

Consumer 实例2-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A ", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

【例三】

此例中,错误的原因有俩个:

  1. 同一个 Consumer ID 下订阅 Topic 个数不一致。
  2. 同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。

Consumer 实例3-1:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
  10. consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
  11. public Action consume(Message message, ConsumeContext context) {
  12. System.out.println(message.getMsgID());
  13. return Action.CommitMessage;
  14. }
  15. });

Consumer 实例3-2:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("jodie_test_A ", "TagB", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
本文导读目录