本文介绍如何使用消息服务MNS的延时消息功能来实现本地操作和消息发送的事务一致性。

背景信息

一些业务场景需要实现本地操作和消息发送的事务一致性,即消息发送成功,则本地操作成功;消息发送失败,本地操作失败(成功也需要回滚),以避免出现操作成功但消息发送失败,或者操作失败但消息发送成功的情况。

另外,消息在消费端一定被成功处理一次,不会因为消费端程序崩溃而导致消息没有成功处理,进而需要人工重置消费进度。

解决方案

利用消息服务MNS的延时消息功能来实现。

准备工作

创建以下队列:

  • 事务消息队列

    消息的有效期小于消息延时时间。即如果生产者不主动修改(提交)消息可见时间,消息对消费者不可见。

  • 操作日志队列

    记录事务消息的操作记录信息。消息延时时间为事务操作超时时间。日志队列中的消息确认(删除)后将对消费者不可见。

操作步骤

操作流程如下图所示。事务消息

事务消息的操作步骤如下:

  1. 发送一条事务准备消息到事务消息队列。
  2. 写操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄。
  3. 执行本地事务操作。
  4. 如果步骤3成功,提交消息(消息对消费者可见);反之,回滚消息。
  5. 确认步骤2中的操作日志(删除该日志消息)。
  6. 步骤4后,消费者可以接收到事务消息。
  7. 消费者处理消息。
  8. 消费者确认删除消息。

异常分析

  • 生产者异常(例如进程重启)
    1. 读取操作日志队列超时未确认日志。
    2. 检查事务结果。
    3. 如果检查得到事务已经成功,则提交消息(重复提交无副作用,同一句柄的消息只能成功提交一次)。
    4. 确认操作日志。
  • 消费者异常(例如进程重启)

    消息服务MNS提供至少保证消费一次的特性,只要步骤8不成功,消息在一段时间后可以继续可见,被当前消费者或者其他消费者处理。

  • 消息服务MNS服务不可达(例如断网)

    消息发送和接收处理状态以及操作日志都在消息服务MNS服务端,消息服务MNS本身具备高可靠和高可用的特点,所以只要网络恢复,事务可以继续。如果要生产者操作成功,消费者一定能够拿到消息并处理成功;如果生产者操作失败,消费者收不到消息。

示例代码

消息服务MNS最新的Java SDK(1.1.8)中的TransactionQueue支持上述事务消息方案。在TransactionOperationsTransactionChecker两个接口添加业务操作和检查逻辑,您就可以方便地实现事务消息。

示例代码如下:

public class TransactionMessageDemo{
    public class MyTransactionChecker implements TransactionChecker
    {
        public boolean checkTransactionStatus(Message message)
        {
            boolean checkResult = false;
            String messageHandler = message.getReceiptHandle();
            try{
                //检查messageHandler事务是否成功。
                checkResult = true;
            }catch(Exception e)
            {
                checkResult = false;
            }
            return checkResult;
        }
    }

    public class MyTransactionOperations implements TransactionOperations
    {
        public boolean doTransaction(Message message)
        {
            boolean transactionResult = false;
            String messageHandler = message.getReceiptHandle();
            String messageBody = message.getMessageBody();
            try{
                //根据messageHandler和messageBody执行本地事务。
                transactionResult = true;
            }catch(Exception e)
            {
                transactionResult = false;
            }
            return transactionResult;
        }
    }

    public static void main(String[] args) {
        System.out.println("Start TransactionMessageDemo");
        String transQueueName = "transQueueName";
        String accessKeyId = ServiceSettings.getMNSAccessKeyId();
        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
        String endpoint = ServiceSettings.getMNSAccountEndpoint();

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient(); //初始化客户端。

        //为事务队列创建队列。
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(transQueueName);
        queueMeta.setPollingWaitSeconds(15);

        TransactionMessageDemo demo = new TransactionMessageDemo();
        TransactionChecker transChecker = demo.new MyTransactionChecker();
        TransactionOperations transOperations = demo.new MyTransactionOperations();

        TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);

        //执行事务。
        Message msg = new Message();
        String messageBody = "TransactionMessageDemo";
        msg.setMessageBody(messageBody); 
        transQueue.sendTransMessage(msg, transOperations);

        //如果不使用队列,请删除队列并关闭客户端。
        transQueue.delete();

        //关闭客户端。
        client.close();
        System.out.println("End TransactionMessageDemo");
    }

}