消费者订阅了某个Topic后,消息队列RocketMQ版会将该Topic中的所有消息投递给消费端进行消费。若消费者只需要关注部分消息,可通过设置过滤条件在消息队列RocketMQ版服务端完成消息过滤,只消费需要关注的消息。本文介绍消息过滤的功能描述、应用场景、使用限制、配置方式及示例代码 。

功能描述

消息过滤功能指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。

消费者订阅Topic时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic中的所有消息都将被投递到消费端进行消费。

消息队列RocketMQ版支持的消息过滤方式如下:
过滤方式 说明 场景 实例限制 协议限制
Tag过滤(默认过滤方式)
  • 发送者:设置消息Tag。
  • 订阅者:订阅消息Tag。
订阅的Tag和发送者设置的消息Tag一致,则消息被投递给消费端进行消费。
简单过滤场景。

一条消息支持设置一个Tag,仅需要对Topic中的消息进行一级分类并过滤时可以使用此方式。

无。 无。
SQL属性过滤
  • 发送者:设置消息的自定义属性。
  • 订阅者:订阅时自定义SQL过滤表达式,根据自定义属性过滤消息。
满足过滤表达式的消息被投递给消费端进行消费。
复杂过滤场景。

一条消息支持设置多个自定义属性,可根据SQL语法自定义组合多种类型的表达式对消息进行多及分类并实现多维度的过滤。

仅企业铂金版实例支持该功能。 仅商业版TCP协议的SDK支持该功能。

Tag过滤

Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

场景示例

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:
  • 订单消息
  • 支付消息
  • 物流消息
这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:
  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 交易成功率分析系统:需订阅订单和支付消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
过滤示意图如下所示。filtermessage

配置方式

消息队列RocketMQ版支持通过SDK配置Tag过滤功能,分别在消息发送和订阅代码中设置消息Tag和订阅消息Tag。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
  • 发送消息

    发送消息时,每条消息必须指明Tag。

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • 订阅所有Tag

    消费者如需订阅某Topic下所有类型的消息,Tag用星号(*)表示。

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 订阅单个Tag

    消费者如需订阅某Topic下某一种类型的消息,请明确标明Tag。

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 订阅多个Tag

    消费者如需订阅某Topic下多种类型的消息,请在多个Tag之间用两个竖线(||)分隔。

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 错误示例

    同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅的Tag为准。

        //如下错误代码中,Consumer只能订阅到MQ_TOPIC下TagB的消息,而不能订阅TagA的消息。
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

SQL属性过滤

SQL属性过滤是在消息发送时设置消息的自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性过滤消息,消息队列RocketMQ版根据表法式的逻辑进行计算,将符合条件的消息投递到消费端。

说明 Tag属于一种特殊的消息属性,所以SQL过滤方式也兼容Tag过滤方法,支持通过Tag属性过滤消息。在SQL语法中,Tag的属性值为TAGS

使用限制

使用SQL属性过滤消息时,有以下限制:
  • 只有企业铂金版实例支持SQL属性过滤,标准版实例不支持该功能。
  • 只有TCP协议的客户端支持SQL属性过滤,HTTP协议的客户端不支持该功能。
  • 若服务端不支持SQL过滤时,客户端使用SQL过滤消息,则客户端启动会报错或收不到消息。

场景示例

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:
  • 订单消息
  • 物流消息
    • 物流消息且地域为杭州
    • 物流消息且地域为上海
这些消息会发送到Trade_TopicTopic中,被各个不同的系统所订阅,以以下系统为例:
  • 物流系统1:只需订阅物流消息且消息地域为杭州。
  • 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
  • 订单跟踪系统:只需订阅订单消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
过滤示意图如下所示。sql过滤场景

配置方式

消息队列RocketMQ版支持通过SDK配置SQL属性过滤。发送端需要在发送消息的代码中设置消息的自定义属性;消费端需要在订阅消息代码中设置SQL语法的过滤表达式。SDK详细信息,请参见SDK参考概述。消息发送端和消费端的代码配置方法如下:
  • 消息发送端:

    设置消息的自定义属性。

    Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
    // 设置自定义属性A,属性值为1。
    msg.putUserProperties("A", "1");
  • 消息消费端

    使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。

    注意 使用属性时,需要先判断属性是否存在。若属性不存在则过滤表达式的计算结果为NULL,消息不会被投递到消费端。
    // 订阅自定义属性A存在且属性值为1的消息。
    consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });
SQL语法如下:
语法 说明 示例
IS NULL 判断属性不存在。 a IS NULL :属性a不存在。
IS NOT NULL 判断属性存在。 a IS NOT NULL:属性a存在。
  • >
  • >=
  • <
  • <=
用于比较数字,不能用于比较字符串,否则Consumer客户端启动会报错。
说明 可转化为数字的字符串也被认为是数字。
  • a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。
  • a IS NOT NULL AND a > 'abc':错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则Consumer客户端启动会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。 a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则Consumer客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。 a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx) 表示属性的值在某个集合内。集合的元素只能是字符串。 a IS NOT NULL AND (a IN ('abc', 'def')):属性a存在且属性a的值为abc或def。
  • =
  • <>
等于和不等于。可用于比较数字和字符串。 a IS NOT NULL AND (a = 'abc' OR a<>'def'):属性a存在且属性a的值为abc或a的值不为def。
  • AND
  • OR
逻辑与和逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。 a IS NOT NULL AND a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。
由于SQL属性过滤是发送端定义消息属性,消费端设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式为:
  • 如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费端。例如比较数字和非数字类型的值。
  • 如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费端。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。
  • 如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费端。

示例代码

  • 发送消息

    同时设置消息Tag和自定义属性。

    Producer producer = ONSFactory.createProducer(properties);
    // 设置Tag的值为tagA。
    Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
    // 设置自定义属性region为hangzhou。
    msg.putUserProperties("region", "hangzhou");
    // 设置自定义属性price为50。
    msg.putUserProperties("price", "50");
    SendResult sendResult = producer.send(msg);
  • 根据单个自定义属性订阅消息。
    Consumer consumer = ONSFactory.createConsumer(properties);
    // 只订阅属性region为hangzhou的消息,若消息中未定义属性region或属性值不是hangzhou,则消息不会被投递到消费端。
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    预期结果:示例中发送的消息属性符合订阅的过滤条件,消息被投递到消费端。

  • 同时根据Tag和自定义属性订阅消息。
    Consumer consumer = ONSFactory.createConsumer(properties);
    // 只订阅Tag的值为tagA且属性price大于30的消息。
    consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    预期结果:示例中发送的消息Tag和自定义属性符合订阅的过滤条件,消息被投递到消费端。

  • 同时根据多个自定义属性订阅消息。
    Consumer consumer = ONSFactory.createConsumer(properties);
    // 只订阅region为hangzhou且属性price小于20的消息。
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    预期结果:消息不会被投递到消费端。订阅的过滤条件中price小于20,发送的消息中price属性值为50,不符合订阅过滤条件。

  • 订阅Topic中的所有消息,不进行过滤。
    Consumer consumer = ONSFactory.createConsumer(properties);
    // 若需要订阅Topic中的所有消息,需要将SQL表达式的值设置为“TRUE”。
    consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    预期结果:Topic中的所有消息都将被投递到消费端进行消费。

  • 错误示例

    消息发送时未自定义某属性,消费端在订阅时未判断该属性是否存在直接使用,则消息不会被投递给消费端。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // 属性product在发送消息时未定义,过滤失败,消息不会被投递至消费端。
    consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });               

更多信息

  • 同一个Group ID下的消费者实例与Topic的订阅关系需保持一致,更多信息,请参见订阅关系一致
  • 合理使用Topic和Tag来过滤消息可以让业务更清晰,更多信息,请参见Topic与Tag最佳实践