全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

消息过滤

更新时间:2017-08-24 13:50:12   分享:   

本文描述 MQ 消费者如何根据 Tag 在 MQ 服务端完成消息过滤。

Tag,即消息标签、消息类型,用来区分某个 MQ 的 Topic 下的消息分类。MQ 允许消费者按照 Tag 对消息进行过滤,确保消费者最终只消费到他关心的消息类型。

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,比如订单创建消息(order)、支付消息(pay)、物流消息(logistics)。这些消息会发送到 Topic 为 Trade_Topic 的队列中,被各个不同的系统所接收,比如支付系统、物流系统、交易成功率分析系统、实时计算系统等。其中,物流系统只需接收物流类型的消息(logistics),而实时计算系统需要接收所有和交易相关(order、pay、logistics)的消息。

消息过滤

说明:针对消息归类,您可以选择创建多个 Topic, 或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,比如全集和子集的关系,流程先后的关系。

参考示例

发送消息

发送消息时,每条消息必须指明消息类型 Tag:

  1. Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());

消费方式-1

消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:

  1. consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
  2. public Action consume(Message message, ConsumeContext context) {
  3. System.out.println(message.getMsgID());
  4. return Action.CommitMessage;
  5. }
  6. });

消费方式-2

消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:

  1. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
  2. public Action consume(Message message, ConsumeContext context) {
  3. System.out.println(message.getMsgID());
  4. return Action.CommitMessage;
  5. }
  6. });

消费方式-3

消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:

  1. consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
  2. public Action consume(Message message, ConsumeContext context) {
  3. System.out.println(message.getMsgID());
  4. return Action.CommitMessage;
  5. }
  6. });

消费方式-4(错误示例

同一个消费者多次订阅某 Topic 下的不同 Tag,后者会覆盖前者:

  1. //如下错误代码中,consumer只能接收到MQ_TOPIC下TagB的消息,而不能接收TagA的消息。
  2. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
  3. public Action consume(Message message, ConsumeContext context) {
  4. System.out.println(message.getMsgID());
  5. return Action.CommitMessage;
  6. }
  7. });
  8. consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
  9. public Action consume(Message message, ConsumeContext context) {
  10. System.out.println(message.getMsgID());
  11. return Action.CommitMessage;
  12. }
  13. });
本文导读目录
本文导读目录
以上内容是否对您有帮助?