本文提供使用TCP协议下的C/C++ SDK收发事务消息的示例代码供您参考。
消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息,能达到分布式事务的最终一致。
交互流程
事务消息交互流程如下图所示。

更多信息,请参见事务消息。
前提条件
您已完成以下操作:
- 下载C/C++ SDK。更多信息,请参见版本说明。
- 准备环境。请根据您使用的SDK版本做好环境准备工作:
发送事务消息
发送事务消息包含以下两个步骤。
- 发送半事务消息(Half Message)及执行本地事务。示例代码如下。
#include "ONSFactory.h" #include "ONSClientException.h" using namespace ons; class MyLocalTransactionExecuter : LocalTransactionExecuter { MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } virtual TransactionStatus execute(Message &value) { // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。) string msgId = value.getMsgID(); // 消息体内容进行crc32,也可以使用其它的如MD5。 // 消息ID和crc32id主要是用来防止消息重复。 // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。 // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。 TransactionStatus transactionStatus = Unknow; try { boolean isCommit = 本地事务执行结果; if (isCommit) { // 本地事务成功、提交消息。 transactionStatus = CommitTransaction; } else { // 本地事务失败、回滚消息。 transactionStatus = RollbackTransaction; } } catch (...) { //exception handle } return transactionStatus; } } int main(int argc, char* argv[]) { //创建Producer和发送消息所必需的信息。 ONSFactoryProperty factoryInfo; factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");//您在消息队列RocketMQ版控制台创建的Group ID。 factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//输入您在消息队列RocketMQ版控制台创建的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//消息Content。 factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。 factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。 //创建producer,消息队列RocketMQ版不负责pChecker的释放,需要业务方自行释放资源。 MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker(); g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker); //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 pProducer->start(); Message msg( //Message Topic factoryInfo.getPublishTopics(), //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。 "TagA", //Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 factoryInfo.getMessageContent() ); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); //发送消息,只要不抛出异常,就代表发送成功。 try { //消息队列RocketMQ版不负责pExecuter的释放,需要业务方自行释放资源。 MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer->send(msg,pExecuter); } catch(ONSClientException & e) { //自定义处理exception的细节。 } // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。 pProducer->shutdown(); return 0; }
- 提交事务消息状态。
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
- 执行本地事务完成后提交。
- 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
事务状态有以下三种:
TransactionStatus.CommitTransaction
:提交事务,允许订阅方消费该消息。TransactionStatus.RollbackTransaction
:回滚事务,消息将被丢弃不允许消费。TransactionStatus.Unknow
:无法判断状态,期待消息队列RocketMQ版的Broker向发送方再次询问该消息对应的本地事务的状态。
提交事务消息状态的示例代码如下。
class MyLocalTransactionChecker : LocalTransactionChecker { MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } virtual TransactionStatus check(Message &value) { // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。) string msgId = value.getMsgID(); // 消息体内容进行crc32,也可以使用其它的如MD5。 // 消息ID和crc32id主要是用来防止消息重复。 // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。 // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。 TransactionStatus transactionStatus = Unknow; try { boolean isCommit = 本地事务执行结果; if (isCommit) { // 本地事务成功、提交消息。 transactionStatus = CommitTransaction; } else { // 本地事务失败、回滚消息。 transactionStatus = RollbackTransaction; } } catch(...) { //exception error } return transactionStatus; } }
事务回查机制说明
- 发送事务消息为什么必须要实现回查Check机制?
当步骤1中半事务消息发送完成,但本地事务返回状态为
TransactionStatus.Unknow
,或者应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条半事务消息的状态是未知的。因此Broker会定期向消息发送方即消息生产者集群中的任意一生产者实例发起消息回查,要求发送方回查该Half状态消息,并上报其最终状态。 - Check被回调时,业务逻辑都需要做些什么?
事务消息的Check方法里面,应该写一些检查事务一致性的逻辑。消息队列RocketMQ版发送事务消息时需要实现
LocalTransactionChecker
接口,用来处理Broker主动发起的本地事务状态回查请求,因此在事务消息的Check方法中,需要完成两件事情:- 检查该半事务消息对应的本地事务的状态(committed or rollback)。
- 向Broker提交该半事务消息本地事务的状态。
订阅事务消息
事务消息的订阅与普通消息订阅一致,更多信息,请参见订阅消息。