本文描述消息队列RocketMQ版的消费者如何根据Tag在消息队列RocketMQ版服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。
Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:
- 订单消息
- 支付消息
- 物流消息
这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:
- 支付系统:只需订阅支付消息。
- 物流系统:只需订阅物流消息。
- 交易成功率分析系统:需订阅订单和支付消息。
- 实时计算系统:需要订阅所有和交易相关的消息。
过滤示意图如下所示。

示例代码
- 发送消息
发送消息时,每条消息必须指明Tag。
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
- 订阅所有Tag
消费者如需订阅某Topic下所有类型的消息,Tag用星号(*)表示。
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- 订阅单个Tag
消费者如需订阅某Topic下某一种类型的消息,请明确标明Tag。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- 订阅多个Tag
消费者如需订阅某Topic下多种类型的消息,请在多个Tag之间用两个竖线(||)分隔。
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
- 错误示例
同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅的Tag为准。
//如下错误代码中,Consumer只能订阅到MQ_TOPIC下TagB的消息,而不能订阅TagA的消息。 consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
更多信息
- 同一个Group ID下的消费者实例与Topic的订阅关系需保持一致,更多信息,请参见订阅关系一致。
- 合理使用Topic和Tag来过滤消息可以让业务更清晰,更多信息,请参见Topic与Tag最佳实践。
在文档使用中是否遇到以下问题
更多建议
匿名提交