您可以通过basicReject或basicNack方法否定应答消息,使消息重入队列,从而是其他Consumer可以消费消息。

使用basicReject方法否定应答单条消息并使消息重入队列

您可以使用basicReject方法实现Consumer否定应答单条消息并使其重入队列。

  • basicReject方法的参数说明如下:
    • deliveryTag:Channel的消息投递的唯一标识符。
    • requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange
  • basicReject方法的示例代码如下:
    channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) {
       @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                          AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
            channel.basicReject(envelope.getDeliveryTag(), true);
        }
    });

使用basicNack方法否定应答多条消息并使消息重入队列

您可以使用basicNack方法实现Consumer否定应答多条消息并使其重入队列。

  • basicNack方法的参数说明如下:
    • deliveryTag:Channel的消息投递的唯一标识符。
    • multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
    • requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange
  • basicNack方法的示例代码如下:
    channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) {
       @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                          AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
            channel.basicNack(envelope.getDeliveryTag(), true, true);
        }
    });