全部产品
云市场

收发事务消息

更新时间:2019-09-22 18:18:25

本文提供使用 TCP 协议下的 C/C++ SDK 收发事务消息的示例代码供您参考。目前支持的地域包括公网、华东1(杭州)、华北2(北京)、华东2(上海)、华南1(深圳)。

消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。

事务消息

详情请参见事务消息

前提条件

发送事务消息

说明:具体的示例代码,请以消息队列 MQ 代码库为准。

发送事务消息包含以下两个步骤:

1.发送半事务消息(Half Message)及执行本地事务。示例代码如下:

  1. #include "ONSFactory.h"
  2. #include "ONSClientException.h"
  3. using namespace ons;
  4. class MyLocalTransactionExecuter : LocalTransactionExecuter
  5. {
  6. MyLocalTransactionExecuter()
  7. {
  8. }
  9. ~MyLocalTransactionExecuter()
  10. {
  11. }
  12. virtual TransactionStatus execute(Message &value)
  13. {
  14. // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
  15. string msgId = value.getMsgID();
  16. // 消息体内容进行 crc32, 也可以使用其它的如 MD5
  17. // 消息 ID 和 crc32id 主要是用来防止消息重复
  18. // 如果业务本身是幂等的, 可以忽略, 否则需要利用 msgId 或 crc32Id 来做幂等
  19. // 如果要求消息绝对不重复, 推荐做法是对消息体 body 使用 crc32 或 MD5 来防止重复消息
  20. TransactionStatus transactionStatus = Unknow;
  21. try {
  22. boolean isCommit = 本地事务执行结果;
  23. if (isCommit) {
  24. // 本地事务成功、提交消息
  25. transactionStatus = CommitTransaction;
  26. } else {
  27. // 本地事务失败、回滚消息
  28. transactionStatus = RollbackTransaction;
  29. }
  30. } catch (...) {
  31. //exception handle
  32. }
  33. return transactionStatus
  34. }
  35. }
  36. int main(int argc, char* argv[])
  37. {
  38. //创建 Producer 和发送消息所必需的信息;
  39. ONSFactoryProperty factoryInfo;
  40. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");//您在控制台创建的 Group ID
  41. factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
  42. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//输入您在控制台创建的 Topic
  43. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//消息 Content
  44. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//阿里云身份验证,在阿里云服务器管理控制台创建
  45. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//阿里云身份验证,在阿里云服务器管理控制台创建
  46. //创建 producer,消息队列 MQ 不负责 pChecker 的释放,需要业务方自行释放资源
  47. MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
  48. g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
  49. //在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可;
  50. pProducer->start();
  51. Message msg(
  52. //Message Topic
  53. factoryInfo.getPublishTopics(),
  54. //Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 MQ 的服务器过滤
  55. "TagA",
  56. //Message Body,不能为空,消息队列 MQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
  57. factoryInfo.getMessageContent()
  58. );
  59. // 设置代表消息的业务关键属性,请尽可能全局唯一。
  60. // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
  61. // 注意:不设置也不会影响消息正常收发
  62. msg.setKey("ORDERID_100");
  63. //发送消息,只要不抛出异常,就代表发送成功
  64. try
  65. {
  66. //消息队列 MQ 不负责 pExecuter 的释放,需要业务方自行释放资源
  67. MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
  68. SendResultONS sendResult = pProducer->send(msg,pExecuter);
  69. }
  70. catch(ONSClientException & e)
  71. {
  72. //自定义处理 exception 的细节
  73. }
  74. // 在应用退出前,必须销毁 Producer 对象,否则会导致内存泄露等问题
  75. pProducer->shutdown();
  76. return 0;
  77. }

2.提交事务消息状态

当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:

  • 执行本地事务完成后提交。

  • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

事务状态有以下三种:

  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。

  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。

  • TransactionStatus.Unknow 无法判断状态,期待消息队列 MQ 的 Broker 向发送方再次询问该消息对应的本地事务的状态。

    1. class MyLocalTransactionChecker : LocalTransactionChecker
    2. {
    3. MyLocalTransactionChecker()
    4. {
    5. }
    6. ~MyLocalTransactionChecker()
    7. {
    8. }
    9. virtual TransactionStatus check(Message &value)
    10. {
    11. // 消息 ID(有可能消息体一样,但消息 ID 不一样, 当前消息 ID 在 console 控制无法查询)
    12. string msgId = value.getMsgID();
    13. // 消息体内容进行 crc32, 也可以使用其它的如 MD5
    14. // 消息 ID 和 crc32id 主要是用来防止消息重复
    15. // 如果业务本身是幂等的, 可以忽略, 否则需要利用 msgId 或 crc32Id 来做幂等
    16. // 如果要求消息绝对不重复, 推荐做法是对消息体 body 使用 crc32 或 MD5 来防止重复消息.
    17. TransactionStatus transactionStatus = Unknow;
    18. try {
    19. boolean isCommit = 本地事务执行结果;
    20. if (isCommit) {
    21. // 本地事务成功、提交消息
    22. transactionStatus = CommitTransaction;
    23. } else {
    24. // 本地事务失败、回滚消息
    25. transactionStatus = RollbackTransaction;
    26. }
    27. } catch(...) {
    28. //exception error
    29. }
    30. return transactionStatus
    31. }
    32. }

事务回查机制说明

  • 发送事务消息为什么必须要实现 check 机制?

    当步骤 1 发送半事务消息完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条半状态的消息的状态是未知的,因此 Broker 会定期要求发送方能 check 该半状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。消息队列 MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 check 方法中,需要完成两件事情:

    (1) 检查该半事务消息对应的本地事务的状态(committed or rollback);

    (2) 向 Broker 提交该半事务消息本地事务的状态。

订阅事务消息

订阅普通消息的说明和示例代码,详情请参见订阅消息