本文描述消息队列RocketMQ版的消费者如何根据Tag在消息队列RocketMQ版服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。

Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

场景示例

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:

  • 订单消息
  • 支付消息
  • 物流消息

这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:

  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 交易成功率分析系统:需订阅订单和支付消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
过滤示意图如下所示。filtermessage

示例代码

  • 发送消息

    发送消息时,每条消息必须指明Tag。

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • 订阅所有Tag

    消费者如需订阅某Topic下所有类型的消息,Tag用星号(*)表示。

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 订阅单个Tag

    消费者如需订阅某Topic下某一种类型的消息,请明确标明Tag。

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 订阅多个Tag

    消费者如需订阅某Topic下多种类型的消息,请在多个Tag之间用两个竖线(||)分隔。

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 错误示例

    同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅的Tag为准。

        //如下错误代码中,Consumer只能订阅到MQ_TOPIC下TagB的消息,而不能订阅TagA的消息。
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

更多信息

  • 同一个Group ID下的消费者实例与Topic的订阅关系需保持一致,更多信息,请参见订阅关系一致
  • 合理使用Topic和Tag来过滤消息可以让业务更清晰,更多信息,请参见Topic与Tag最佳实践