如果不使用Routing Key去做绑定,而是根据消息Headers属性和Binding Headers属性的匹配规则路由消息,需要使用Headers Exchange。本文介绍Headers Exchange的使用示例。

背景信息

  • 向Headers Exchange发送消息时,可以在Headers中定义键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。

    匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:

    • all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
    • any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。

    更多信息,请参见Headers Exchange

  • 绑定成功后,您可以在云消息队列 RabbitMQ 版控制台消息查询页面,按照Queue查询消息,验证绑定结果。具体操作,请参见查询消息

示例代码

Headers Exchange绑定Java示例代码如下:

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;


public class headersTestNew {
    public static void main(String[] args)
        throws IOException, TimeoutException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在云消息队列 RabbitMQ 版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // ${instanceId}为实例ID,在云消息队列 RabbitMQ 版控制台获取。
        // ${AccessKey}阿里云身份验证,在阿里云RAM访问控制台创建。
        // ${SecretKey}阿里云身份验证,在阿里云RAM访问控制台创建。
        // 注意:开启Connection才能自动恢复。
        factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在云消息队列 RabbitMQ 版控制台创建。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        // 请尽可能使用长期存活的Connection,以免每次发送消息都创建新的Connection,导致大量的网络资源和服务端资源消耗,甚至引起服务端SYN Flood防护。
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "${ExchangeName}";
        String exchangeType = "headers";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";


        Map<String, Object> argument = new HashMap<>();
        argument.put("format", "pdf");
        argument.put("type", "log");
        argument.put("x-match", "all");

        channel.queueDeclare(queueName, true, false, false, null);
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey, argument);


        // 当mandatory=true,消息没有路由时,将会返回给客户端。
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                                     BasicProperties properties, byte[] body) throws IOException {
                System.out.println("no route, msgId=" + properties.getMessageId());
            }
        });

        // 设置消息Headers属性键值对。
        // 1.当注释(type, log)键值对,仅(format, pdf)一组键值对与argument匹配,执行该代码后,消息将无法接收到。
        // 2.当注释(type, log)键值对被取消,即(format, pdf)和(type, log)两组键值对与argument完全匹配,执行该代码后,将接收到消息。
        Map<String, Object> headers = new HashMap<>();
        headers.put("format", "pdf");
        //headers.put("type", "log");

        BasicProperties props = new BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
        channel.basicPublish(exchangeName, routingKey, true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8));

        Thread.sleep(10000);
        connection.close();
    }
}