全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

收发事务消息

更新时间:2018-06-19 15:39:39

交互流程

MQ 的事务消息交互流程如下图所示:

MQ 事务消息交互流程

说明:关于 TCP 接入点域名,请参见 TCP 接入说明

发送事务消息

发送事务消息包含以下两个步骤:

  1. 发送半消息及执行本地事务。 示例代码如下:

    1. package com.alibaba.webx.TryHsf.app1;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    4. import com.aliyun.openservices.ons.api.SendResult;
    5. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    6. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    7. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    8. import java.util.Properties;
    9. import java.util.concurrent.TimeUnit;
    10. public class TransactionProducerClient {
    11. private final static Logger log = ClientLogger.getLog(); // 您需要设置自己的日志,便于排查问题
    12. public static void main(String[] args) throws InterruptedException {
    13. final BusinessService businessService = new BusinessService(); // 本地业务 Service
    14. Properties properties = new Properties();
    15. // 您在控制台创建的 Producer ID 注意:事务消息的 Producer ID 不能与其他类型消息的 Producer ID 共用
    16. properties.put(PropertyKeyConst.ProducerId, "");
    17. // 阿里云身份验证,在阿里云服务器管理控制台创建
    18. properties.put(PropertyKeyConst.AccessKey, "");
    19. // 阿里云身份验证,在阿里云服务器管理控制台创建
    20. properties.put(PropertyKeyConst.SecretKey, "");
    21. // 设置 TCP 接入域名(此处以公共云生产环境为例)
    22. properties.put(PropertyKeyConst.ONSAddr,
    23. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
    24. TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
    25. new LocalTransactionCheckerImpl());
    26. producer.start();
    27. Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
    28. try {
    29. SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
    30. @Override
    31. public TransactionStatus execute(Message msg, Object arg) {
    32. // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
    33. String msgId = msg.getMsgID();
    34. // 消息体内容进行 crc32,也可以使用其它的如 MD5
    35. long crc32Id = HashUtil.crc32Code(msg.getBody());
    36. // 消息 ID 和 crc32id 主要是用来防止消息重复
    37. // 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
    38. // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息
    39. Object businessServiceArgs = new Object();
    40. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    41. try {
    42. boolean isCommit =
    43. businessService.execbusinessService(businessServiceArgs);
    44. if (isCommit) {
    45. // 本地事务成功则提交消息
    46. transactionStatus = TransactionStatus.CommitTransaction;
    47. } else {
    48. // 本地事务失败则回滚消息
    49. transactionStatus = TransactionStatus.RollbackTransaction;
    50. }
    51. } catch (Exception e) {
    52. log.error("Message Id:{}", msgId, e);
    53. }
    54. System.out.println(msg.getMsgID());
    55. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
    56. return transactionStatus;
    57. }
    58. }, null);
    59. }
    60. catch (Exception e) {
    61. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
    62. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
    63. e.printStackTrace();
    64. }
    65. // demo example 防止进程退出(实际使用不需要这样)
    66. TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
    67. }
    68. }
  2. 提交事务消息状态

    当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。 通知方式有以下两种:

    • 执行本地事务完成后提交
    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态

    事务状态有以下三种:

    • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
    • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
    • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
  1. public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
  2. private final static Logger log = ClientLogger.getLog();
  3. final BusinessService businessService = new BusinessService();
  4. @Override
  5. public TransactionStatus check(Message msg) {
  6. //消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息属于 Half 消息,所以消息 ID 在控制台无法查询)
  7. String msgId = msg.getMsgID();
  8. //消息体内容进行 crc32,也可以使用其它的方法如 MD5
  9. long crc32Id = HashUtil.crc32Code(msg.getBody());
  10. //消息 ID、消息本 crc32Id 主要是用来防止消息重复
  11. //如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
  12. //如果要求消息绝对不重复,推荐做法是对消息体使用 crc32 或 md5 来防止重复消息
  13. //业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理
  14. Object businessServiceArgs = new Object();
  15. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  16. try {
  17. boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
  18. if (isCommit) {
  19. //本地事务已成功则提交消息
  20. transactionStatus = TransactionStatus.CommitTransaction;
  21. } else {
  22. //本地事务已失败则回滚消息
  23. transactionStatus = TransactionStatus.RollbackTransaction;
  24. }
  25. } catch (Exception e) {
  26. log.error("Message Id:{}", msgId, e);
  27. }
  28. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  29. return transactionStatus;
  30. }
  31. }

工具类

  1. import java.util.zip.CRC32;
  2. public class HashUtil {
  3. public static long crc32Code(byte[] bytes) {
  4. CRC32 crc32 = new CRC32();
  5. crc32.update(bytes);
  6. return crc32.getValue();
  7. }
  8. }

事务回查机制说明

  • 发送事务消息为什么必须要实现回查 Check 机制?

    当步骤(1)中 Half 消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条 Half 状态的消息的状态是未知的。 因此 MQ Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。 MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    (1) 检查该 Half 消息对应的本地事务的状态(commited or rollback);

    (2) 向 MQ Broker 提交该 Half 消息本地事务的状态。

订阅事务消息

事务消息的订阅与普通消息订阅一致,详见订阅消息

本文导读目录