This topic describes how SOFAStack Message Queue consumers filter messages by tag on the server. This ensures that consumers receive only the messages they are interested in.
A tag is a message label used to classify messages within a topic. When a producer sends a message, it specifies a tag. The consumer must then subscribe to that specified tag.
Sample code
Send a message
Specify a tag for each message that you send:
producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA").withValue(orderPojo).build()Subscribe to all tags
To subscribe to all message types in a topic, use an asterisk (*) for the tag:
SOFABOOT example
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned. @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "*") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }Non-SOFABOOT example
consumer.subscribe("TP_XXX", "*", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater. return Action.CommitMessage; } });
Subscribe to a single tag
To subscribe to a specific message type in a topic, specify the tag:
SOFABOOT example
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned. @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "TAGA") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }Non-SOFABOOT example
consumer.subscribe("TP_XXX", "TAGA", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater. return Action.CommitMessage; } });
Subscribe to multiple tags
To subscribe to multiple message types in a topic, separate the tags with `||`:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
return Action.CommitMessage;
}
});SQL92 filtering
In the filter expression, `TAGS` refers to the message tag. Other variables are retrieved from the message properties, which can be set using `Message#putUserProperties`.
SOFABOOT example
import static com.alipay.sofa.sofamq.api.MessageConsumer.SQL_FILTER; import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // Configure this class as a bean using XML or an annotation. The @Messaging annotation alone is not scanned. @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "(TAGS in ('tag')) and a > 5", filterType = SQL_FILTER) public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }Non-SOFABOOT example
consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql("(TAGS in ('tag')) and a > 5"), new MessageListenerImpl());
Incorrect example
If a consumer subscribes to the same topic multiple times, only the last subscription takes effect.
// In the following incorrect code, the consumer subscribes only to messages with TagB under MQ_TOPIC. It does not subscribe to messages with TagA.
consumer.subscribe("MQ_TOPIC", "TagA", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
return Action.CommitMessage;
}
});
consumer.subscribe("MQ_TOPIC", "TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// To test the message redelivery feature, replace Action.CommitMessage with Action.ReconsumeLater.
return Action.CommitMessage;
}
});More information
Consumer instances within the same Group ID must have consistent subscription relationships for the topic. For more information, see Consistent subscription relationships.
Correctly using topics and tags to filter messages can help clarify your business logic. For more information, see Topics and tags.