文档

RocketMQ事务型消息

更新时间:
一键部署

消息队列RocketMQ提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景。本文从RocketMQ事务型消息的价值、应用场景、消息原理和源码进行分析,给广大客户使用RocketMQ事务型消息提供参考。

事务型消息的价值

消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

事务型消息的应用场景

消息系统我们一般作为系统间的解耦,异步通信,削峰。然而事务型的消息在消息系统优势的基础上,还能够达到上下游系统的数据或者行为的最终一致性。而且有了事务型消息我们可以减少对于异常场景的特殊处理。虽然我们有很多方式能达到最终一致性目的,但是事务型消息对于某些业务场景更合适。如下列这种场景:

有了事务型消息,我们可以先发送一个prepare的消息给到消息服务器(RocketMQ的概念就是broker),然后执行本地事务并提交,本地事务提交成功后,再发送一个commit的消息给到 broker。最后broker在收到commit消息之后,就可以向下游去投递消息了。

事务型消息的原理

事务型消息有下列三个关键内容:

  • 本地事务的状态

    本地事务的执行状态,一共有以下三种:

    • 事务提交:LocalTransactionState.COMMIT_MESSAGE。

    • 事务回滚:LocalTransactionState.ROLLBACK_MESSAGE。

    • 事务未决:LocalTransactionState.UNKNOW。

  • 事务消息Topic

    • topic1:RMQ_SYS_TRANS_HALF_TOPIC

      用于存放prepare消息的topic,只有一个message queue,消息体中会保存原始的消息信息。

    • topic2:RMQ_SYS_TRANS_OP_HALF_TOPIC

      用于存放prepare的操作消息,里面会放TAG为REMOVE的消息,其中消息体是指向prepare消息的commitLogOffset。

  • 事务型消息的执行流程

    • RocketMQ事务消息交互流程如下图所示:交互流程图

    • 事务消息发送步骤如下:

      1. 发送方将半事务消息发送至RocketMQ服务端。

      2. 服务端将消息持久化成功之后,向发送方返回ACK确认消息已经发送成功,此时消息为半事务消息。

      3. 发送方开始执行本地事务逻辑。

      4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

    • 事务消息回查步骤如下:

      1. 在断网或者是应用重启的特殊情况下,上述发送步骤的步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

      2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

      3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照发送步骤的步骤4对半事务消息进行操作。

事务型消息的源码

下面的内容将对源码进行分析。

  1. 入口执行SendMessageInTransaction函数发送消息。

    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  2. 判断是否注册了监听器,如果没有注册,则直接显示异常。

    if (null == this.transactionListener) {
          throw new MQClientException("TransactionListener is null", null);
     }
    

    监听器代码如下:

    public interface TransactionListener {
        /**
         * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
         * 执行本地事务
         * @param msg Half(prepare) message
         * @param arg Custom business parameter
         * @return Transaction state
         */
        LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    
        /**
         * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
         * method will be invoked to get local transaction status.
         * 回传给broker事务执行的状态
         * @param msg Check message
         * @return Transaction state
         */
        LocalTransactionState checkLocalTransaction(final MessageExt msg);
    }
    

    事务监听器TransactionListener类解释有以下两种抽象方法。

    • 方法一:在Producer发送prepare消息之后执行本地事务,返回本地事务执行的状态信息(commit,rollback,unknow)

    • 方法二:当本地事务执行之后,回传消息失败,broker回查时回调用的方法。

      1. producer开始发送prepare消息给broker。

        发送消息会有四种状态,分别是SEND_OKFLUSH_DISK_TIMEOUTFLUSH_SLAVE_TIMEOUTSLAVE_NOT_AVAILABLE。

      2. 如果是SEND_OK状态,将开始执行本地事务。

        这里执行的方法也就是前面所说的事务监听器中的TransactionListener#executeLocalTransaction,需要用户用第三步的方法实现。

      3. 回传给broker本地事务的执行状态(第二次给broker发送消息,所以MQ采用的是二阶段提交来实现事务)。

        在执行本地事务结束后,又将会产生状态,因为本地事务可能执行成功,也有可能是执行失败,所以,本地事务执行后需要将事务执行状态返回给broker,这样broker才好根据这次的信息来决定之前存储的prepare消息是销毁还是正常下发给consumer。

        DefaultMQProducerImpl#endTransaction
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                    this.defaultMQProducer.getSendMsgTimeout());
        

        可以看到,这里发送的消息类型是oneway类型,也就是说producer只是把事务执行的结果状态传给了broker,不考虑broker是否接收到。

        如果broker恰好因为一些原因没有收到这个回传的事务执行结果信息,那么它就不知道怎么处理之前的prepare消息了,同样的,如果producer回传的状态信息既不是commit也不是rollback,而是unknow,那它也同样不知道该怎么进行下一步。所以当回传的消息丢失,或者是unknow时,broker会定时回查producer

        而回传执行的代码就是第二步中的事务监听器类中的另一个方法TranscationListener#checkLocalTransaction。

总结

从RocketMQ事务型消息链路体现了面向失败的设计思路,也体现了事务型系统的严谨性,在第二阶段的消息没有送达的时候,broker会主动请求producer端去做check,producer做完check后会将事务的状态再次返回。虽然说实现最终一致的方案有很多,但是事务型消息是比较优雅实现方式之一。

  • 本页导读
文档反馈