Message filtering

更新时间:
复制 MD 格式

This topic describes how SOFAStack Message Queue consumers filter messages by tag on the server. This ensures that consumers receive only the messages they are interested in.

A tag is a message label used to classify messages within a topic. When a producer sends a message, it specifies a tag. The consumer must then subscribe to that specified tag.

Sample code

Send a message

Specify a tag for each message that you send:

producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA").withValue(orderPojo).build()

Subscribe to all tags

To subscribe to all message types in a topic, use an asterisk (*) for the tag:

  • SOFABOOT example

    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned.
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "*")
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • Non-SOFABOOT example

        consumer.subscribe("TP_XXX", "*", new GenericMessageListener<OrderPojo>() {
                @Override
                public Class<OrderPojo> payloadClass() {
                    return OrderPojo.class;
                }
    
                @Override
                public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
                    System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
                    // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
                    return Action.CommitMessage;
                }
        });

Subscribe to a single tag

To subscribe to a specific message type in a topic, specify the tag:

  • SOFABOOT example

    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned.
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "TAGA")
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • Non-SOFABOOT example

    consumer.subscribe("TP_XXX", "TAGA", new GenericMessageListener<OrderPojo>() {
        @Override
        public Class<OrderPojo> payloadClass() {
            return OrderPojo.class;
        }
    
        @Override
        public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
            System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
            // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
            return Action.CommitMessage;
        }
    });

Subscribe to multiple tags

To subscribe to multiple message types in a topic, separate the tags with `||`:

consumer.subscribe("MQ_TOPIC", "TagA||TagB", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
        return Action.CommitMessage;
    }
});

SQL92 filtering

In the filter expression, `TAGS` refers to the message tag. Other variables are retrieved from the message properties, which can be set using `Message#putUserProperties`.

  • SOFABOOT example

    import static com.alipay.sofa.sofamq.api.MessageConsumer.SQL_FILTER;
    
    import com.alipay.sofa.sofamq.api.MessageConsumer;
    import com.alipay.sofa.sofamq.api.Messaging;
    
    // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned.
    @Messaging
    public class SomeClass {
        @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "(TAGS in ('tag')) and a > 5", filterType = SQL_FILTER)
        public void someMethodReceivePojo(OrderPojo somePojo) {
            // do something
        }
    }
  • Non-SOFABOOT example

    consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql("(TAGS in ('tag')) and a > 5"), new MessageListenerImpl());

Incorrect example

If a consumer subscribes to the same topic multiple times, only the last subscription takes effect.

// In the following incorrect code, the consumer subscribes only to messages with TagB under MQ_TOPIC. It does not subscribe to messages with TagA.
consumer.subscribe("MQ_TOPIC", "TagA", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
        return Action.CommitMessage;
    }
});

consumer.subscribe("MQ_TOPIC", "TagB", new GenericMessageListener<OrderPojo>() {
    @Override
    public Class<OrderPojo> payloadClass() {
        return OrderPojo.class;
    }

    @Override
    public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
        System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
        // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
        return Action.CommitMessage;
    }
});

More information

  • Consumer instances within the same Group ID must have consistent subscription relationships for the topic. For more information, see Consistent subscription relationships.

  • Correctly using topics and tags to filter messages can help clarify your business logic. For more information, see Topics and tags.