消息幂等

更新时间:

如果消息重复消费会影响您的业务处理,请对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

什么是消息幂等

在数学与计算机学中,幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为100。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。

适用场景

在互联网应用中,尤其在网络不稳定的情况下,云消息队列 RabbitMQ 版的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,云消息队列 RabbitMQ 版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启)

    云消息队列 RabbitMQ 版的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。

处理方法

以Message ID为幂等键对消息进行幂等处理的步骤如下:

  1. 在数据库中创建一张unique key索引为唯一Message ID的表。

  2. 在Producer客户端为每条消息设置唯一Message ID。

    设置唯一Message ID的示例代码如下:

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
    channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));

    了解更多Message ID相关信息,请参见如何设置Message ID

  3. 在Consumer客户端根据唯一Message ID对消息进行幂等处理。

    根据唯一Message ID进行幂等处理的示例代码如下:

    channel.basicConsume(Producer.QueueName, false, "YourConsumerTag",
        new DefaultConsumer(channel) {
        @Override public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 1. 获取业务唯一性索引数据。
            try{
                String messageId = properties.getMessageId();
                // Message ID或者其他作为unique key的信息。
                // 2. 开启数据库事务。
                idempTable.insert(messageId);
                // 3. 对接收到的消息,进行业务逻辑处理。
                // 4. 提交或回滚事务。// 处理成功,则进行ACK,否则不要进行ACK。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
            catch (数据库主键冲突异常 e){
                // 重复消息,直接确认掉。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    }
    );