更新时间:2021-01-05 14:54
本文描述 SOFAStack 消息队列的消费者如何根据 Tag 在消息队列服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。
Tag,即消息标签,用于对某个 Topic 下的消息进行分类。消息队列的生产者在发送消息时,已经指定消息的 Tag,消费者需根据已经指定的 Tag 来进行订阅。
发送消息时,每条消息必须指明 Tag:
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
同一个消费者多次订阅某个 Topic 下的 Tag,以最后一次订阅的 Tag 为准:
//如下错误代码中,Consumer 只能订阅到 MQ_TOPIC 下 TagB 的消息,而不能订阅 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
在文档使用中是否遇到以下问题
更多建议
匿名提交