全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

收发事务消息

更新时间:2017-12-15 20:41:41

目前支持的域包括公网、华东1、华北2、华东2、华南1。

交互流程

MQ 事务消息的交互流程如下图所示:

MQ 事务消息交互流程

发送事务消息

发送事务消息包含以下两个步骤:

1.发送半消息(Half Message)及执行本地事务。示例代码如下:


#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;

    class MyLocalTransactionExecuter : LocalTransactionExecuter
    {
        MyLocalTransactionExecuter()
        {
        }

        ~MyLocalTransactionExecuter()
        {
        }
        virtual TransactionStatus execute(Message &value)
        {
                // 消息 ID(有可能消息体一样,但消息 id 不一样, 当前消息 ID 在 console 控制不可能查询)
                string msgId = value.getMsgID();
                // 消息体内容进行 crc32, 也可以使用其它的如 MD5
                // 消息 ID 和 crc32id 主要是用来防止消息重复
                // 如果业务本身是幂等的, 可以忽略, 否则需要利用 msgId 或 crc32Id 来做幂等
                // 如果要求消息绝对不重复, 推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息. 
                TransactionStatus transactionStatus = Unknow;
                try {
                    boolean isCommit = 本地事务执行结果;
                    if (isCommit) {
                        // 本地事务成功、提交消息
                        transactionStatus = CommitTransaction;
                    } else {
                        // 本地事务失败、回滚消息
                        transactionStatus = RollbackTransaction;
                    }
                } catch (...) {
                    //exception handle
                }
                return transactionStatus;
        }
    }

    int main(int argc, char* argv[])
    {
        ONSFactoryProperty factoryInfo;
        factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//您在控制台创建的 Producer ID
        factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//输入您在控制台创建的 Topic
        factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//msg content
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//阿里云身份验证,在阿里云服务器管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//阿里云身份验证,在阿里云服务器管理控制台创建

        //创建 producer,MQ 不负责 pChecker 的释放,需要业务方自行释放资源
        MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
        g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);

        //在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可;
        pProducer->start();

        Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            //Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 ONS 服务器过滤       
            "TagA",
            //Message Body,不能为空,MQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
            factoryInfo.getMessageContent()
        );

        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过 MQ Console 查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey("ORDERID_100");

        //发送消息,只要不抛出异常,就代表发送成功     
        try
        {
            //MQ 不负责 pExecuter 的释放,需要业务方自行释放资源
            MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
            SendResultONS sendResult = pProducer->send(msg,pExecuter);
        }
        catch(ONSClientException & e)
        {
            //自定义处理 exception 的细节
        }
        // 在应用退出前,必须销毁 Producer 对象,否则会导致内存泄露等问题
        pProducer->shutdown();

        return 0;

    }

2.提交事务消息状态

当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:

  • 执行本地事务完成后提交;
  • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

事务状态有以下三种:

  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;
  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;
  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 消息 ID(有可能消息体一样,但消息 id 不一样, 当前消息 ID 在 console 控制不可能查询)
             string msgId = value.getMsgID();
             // 消息体内容进行 crc32, 也可以使用其它的如 MD5
             // 消息 ID 和 crc32id 主要是用来防止消息重复
             // 如果业务本身是幂等的, 可以忽略, 否则需要利用 msgId 或 crc32Id 来做幂等
             // 如果要求消息绝对不重复, 推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息. 
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事务执行结果;
                 if (isCommit) {
                     // 本地事务成功、提交消息
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事务失败、回滚消息
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }
    

事务回查机制说明

1、发送事务消息为什么必须要实现 check 机制?

当步骤1发送半消息完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条半状态的消息的状态是未知的,因此 MQ Broker 会定期要求发送方能 check 该半状态消息,并上报其最终状态。

2、Check 被回调时,业务逻辑都需要做些什么?

MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 check 方法中,需要完成两件事情:

(1) 检查该半消息对应的本地事务的状态(commited or rollback);

(2) 向 MQ Broker 提交该半消息本地事务的状态。

3、本地事务的不同状态对 Half 消息的影响?

  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。

  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。

  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。

    具体代码详见 MyLocalTransactionChecker 的实现。

订阅事务消息

关于订阅普通消息的说明和示例代码,详见订阅消息

本文导读目录