本文介绍如何使用消息服务MNS的延时消息功能,实现本地操作和消息发送的事务一致性。

前提条件

您已创建以下队列,具体操作,请参见创建队列

  • 事务消息队列
    消息存活时间小于消息延时时间。
    • 当消息发送成功、事务操作成功时,生产者修改消息延迟时间,消息对消费者可见。
    • 当消息发送成功、事务操作失败时,生产者不修改消息延迟时间,消息对消费者不可见。
  • 操作日志队列

    记录事务消息的操作记录信息。消息延时时间为事务消息操作超时时间。日志队列中的消息确认后将对消费者不可见。

原理介绍

一些业务场景需要保证本地操作和消息发送的事务一致性,即消息发送成功,本地操作成功。如果消息发送成功,本地操作失败,那么发送成功的消息需要回滚。操作流程如下图所示。

事务消息

消息发送成功,事务操作成功时操作步骤如下所示:

  1. 生产者发送一条事务准备消息到事务消息队列。
  2. 生产者发送操作日志消息到操作日志队列,日志中包含步骤1消息的消息句柄。
  3. 生产者执行本地事务操作成功。
  4. 生产者请求修改消息延迟时间,使消息对消费者可见。
  5. 生产者向操作日志队列确认操作日志,删除日志消息。
  6. 消费者从事务消息队列中接收事务消息。
  7. 消费者处理事务消息。
  8. 消费者请求删除事务消息。

消息发送成功,事务操作失败时操作步骤如下所示:

  1. 生产者发送一条事务准备消息到事务消息队列。
  2. 生产者发送操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄。
  3. 生产者执行本地事务操作失败。
  4. (对应上图中步骤A)操作日志队列向生产者发送消息,请求读取超时未确认操作日志。
  5. (对应上图中步骤B)生产者检查事务结果,发现操作失败。
  6. (对应上图中步骤C)生产者提交回滚消息请求,不修改消息延迟时间,消息对消费者不可见。
  7. (对应上图中步骤D)生产者向操作日志队列确认操作日志,删除日志消息。

示例代码

消息服务MNS最新的Java SDK(1.1.8)中的TransactionQueue支持上述事务消息方案。在TransactionOperationsTransactionChecker两个接口添加业务操作和检查逻辑,您就可以方便地实现事务消息。

示例代码如下:

public class TransactionMessageDemo {

    public class MyTransactionChecker implements TransactionChecker {
        @Override
        public boolean checkTransactionStatus(Message message) {
            boolean checkResult = false;
            String messageHandler = message.getReceiptHandle();
            try {
                //检查messageHandler事务是否成功。
                checkResult = true;
            } catch (Exception e) {
                checkResult = false;
            }
            return checkResult;
        }
    }

    public class MyTransactionOperations implements TransactionOperations {
        @Override
        public boolean doTransaction(Message message) {
            boolean transactionResult = false;
            String messageHandler = message.getReceiptHandle();
            String messageBody = message.getMessageBody();
            try {
                //根据messageHandler和messageBody执行本地事务。
                transactionResult = true;
            } catch (Exception e) {
                transactionResult = false;
            }
            return transactionResult;
        }
    }

    public static void main(String[] args) {
        System.out.println("Start TransactionMessageDemo");
        String transQueueName = "transQueueName";
        String accessKeyId = ServiceSettings.getMNSAccessKeyId();
        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
        String endpoint = ServiceSettings.getMNSAccountEndpoint();

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient(); //初始化客户端。

        //为事务队列创建队列。
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(transQueueName);
        queueMeta.setPollingWaitSeconds(15);

        TransactionMessageDemo demo = new TransactionMessageDemo();
        TransactionChecker transChecker = demo.new MyTransactionChecker();
        TransactionOperations transOperations = demo.new MyTransactionOperations();

        TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);

        //执行事务。
        Message msg = new Message();
        String messageBody = "TransactionMessageDemo";
        msg.setMessageBody(messageBody);
        transQueue.sendTransMessage(msg, transOperations);

        //如果不使用队列,请删除队列并关闭客户端。
        transQueue.delete();

        //关闭客户端。
        client.close();
        System.out.println("End TransactionMessageDemo");
    }

}

异常分析

  • 生产者异常(例如进程重启)
    1. 读取操作日志队列中超时未确认日志。
    2. 检查事务结果。
    3. 如果检查到事务执行成功,则提交消息。
      说明 重复提交不会对流程产生影响,同一句柄的消息只能成功提交一次。
    4. 确认操作日志。
  • 消费者异常(例如进程重启)

    消息服务MNS提供至少保证消费一次的特性,如果当前消费者没有成功消费并删除消息,消息在不可见时间后将继续可见,被当前消费者或者其他消费者处理。

  • 消息服务MNS服务不可达(例如断网)

    消息发送和接收处理状态及操作日志都在消息服务MNS服务端,消息服务MNS本身具备高可靠和高可用的特点,所以只要网络恢复正常,事务就可以继续进行。只要生产者操作成功,消费者就能收到消息并成功处理;如果生产者操作失败,则消费者无法收到消息。