全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

收发事务消息

更新时间:2018-06-19 15:55:17

目前支持的域包括公网、华东1、华北2、华东2、华南1。

交互流程

MQ 事务消息的交互流程如下图所示:

MQ 事务消息交互流程

发送事务消息

发送事务消息包含以下两个步骤:

  1. 发送半消息(Half Message)及执行本地事务。 示例代码如下:

    1. using System;
    2. using System.Collections.Generic;
    3. using System.Linq;
    4. using System.Text;
    5. using System.Runtime.InteropServices;
    6. using ons;
    7. namespace ons
    8. {
    9. public class MyLocalTransactionExecuter : LocalTransactionExecuter
    10. {
    11. public MyLocalTransactionExecuter()
    12. {
    13. }
    14. ~MyLocalTransactionExecuter()
    15. {
    16. }
    17. public override TransactionStatus execute(Message value)
    18. {
    19. Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
    20. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    21. // 消息 ID(有可能消息体一样,但消息 ID 不一样。当前消息 ID 在控制台无法查询)
    22. string msgId = value.getMsgID();
    23. // 消息体内容进行 crc32, 也可以使用其它的如 MD5
    24. // 消息 ID 和 crc32id 主要是用来防止消息重复
    25. // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息
    26. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    27. try {
    28. boolean isCommit = 本地事务执行结果;
    29. if (isCommit) {
    30. // 本地事务成功则提交消息
    31. transactionStatus = TransactionStatus.CommitTransaction;
    32. } else {
    33. // 本地事务失败则回滚消息
    34. transactionStatus = TransactionStatus.RollbackTransaction;
    35. }
    36. } catch (Exception e) {
    37. //exception handle
    38. }
    39. return transactionStatus
    40. }
    41. }
    42. class onscsharp
    43. {
    44. static void Main(string[] args)
    45. {
    46. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    47. factoryInfo.setFactoryProperty(factoryInfo.ProducerId, "");//您在 MQ 控制台创建的 Producer ID
    48. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "");// 您在 MQ 控制台创建的 Topic
    49. factoryInfo.setFactoryProperty(factoryInfo.MsgContent, "");//message body
    50. factoryInfo.setFactoryProperty(factoryInfo.AccessKey, "");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    51. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    52. //create transaction producer
    53. ONSFactory onsfactory = new ONSFactory();
    54. LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
    55. TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    56. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可,启动之后可以多线程并发发送消息
    57. pProducer.start();
    58. Message msg = new Message(
    59. //Message Topic
    60. factoryInfo.getPublishTopics(),
    61. //Message Tag
    62. "TagA",
    63. //Message Body
    64. factoryInfo.getMessageContent()
    65. );
    66. // 设置代表消息的业务关键属性,请尽可能全局唯一
    67. // 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发。
    68. // 注意:不设置也不会影响消息正常收发
    69. msg.setKey("ORDERID_100");
    70. // 发送消息,只要不抛出异常,就代表发送成功
    71. try
    72. {
    73. LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
    74. SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
    75. }
    76. catch(ONSClientException e)
    77. {
    78. Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
    79. }
    80. // 在应用退出前,必须销毁 Producer 对象,否则会导致内存泄露等问题;
    81. // shutdown 之后不能重新 start 此 producer
    82. pProducer.shutdown();
    83. }
    84. }
    85. }
  2. 提交事务消息状态

    当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。 通知方式有以下两种:

    • 执行本地事务完成后提交;
    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

    事务状态有以下三种:

    • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;
    • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;
    • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
  1. public class MyLocalTransactionChecker : LocalTransactionChecker
  2. {
  3. public MyLocalTransactionChecker()
  4. {
  5. }
  6. ~MyLocalTransactionChecker()
  7. {
  8. }
  9. public override TransactionStatus check(Message value)
  10. {
  11. Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
  12. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
  13. // 消息 ID(有可能消息体一样,但消息 ID 不一样。当前消息 ID 在控制台无法查询)
  14. string msgId = value.getMsgID();
  15. // 消息体内容进行 crc32, 也可以使用其它的如 MD5
  16. // 消息 ID 和 crc32id 主要是用来防止消息重复
  17. // 如果业务本身是幂等的, 可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
  18. // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息
  19. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  20. try {
  21. boolean isCommit = 本地事务执行结果;
  22. if (isCommit) {
  23. // 本地事务成功、提交消息
  24. transactionStatus = TransactionStatus.CommitTransaction;
  25. } else {
  26. // 本地事务失败、回滚消息
  27. transactionStatus = TransactionStatus.RollbackTransaction;
  28. }
  29. } catch (Exception e) {
  30. //exception handle
  31. }
  32. return transactionStatus
  33. }
  34. }

事务回查机制说明

  • 发送事务消息为什么必须要实现 check 机制?

    当步骤 1 半消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条半状态的消息的状态是未知的,因此 MQ Broker 会定期要求发送方能 check 该半状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。 MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 check 方法中,需要完成两件事情:

    (1)检查该半消息对应的本地事务的状态(commited or rollback);

    (2)向 MQ Broker 提交该半消息本地事务的状态。

  • 本地事务的不同状态对半消息的影响?

    • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;

    • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;

    • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。

    具体代码详见 MyLocalTransactionChecker 的实现。

订阅事务消息

关于订阅普通消息的说明和示例代码,详见订阅消息

本文导读目录