全部产品

收发事务消息

更新时间:2020-01-10 00:07:46

本文提供使用 TCP 协议下的 Java SDK 收发事务消息的示例代码供您参考。

消息队列提供类似 X/Open XA 的分布式事务功能,通过消息队列事务消息,能达到分布式事务的最终一致。

说明:对于新手用户,建议在正式收发消息前,阅读 Demo 工程 来了解搭建消息队列工程的具体步骤。

交互流程

事务消息交互流程如下图所示。

事务消息交互流程

详情请参见 消息类型 > 事务消息

前提条件

您已完成以下操作:

发送事务消息

说明:具体的示例代码,请以 消息队列代码库 为准。

发送事务消息包含以下两个步骤:

  1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。

    1. import java.util.Properties;
    2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    3. import io.openmessaging.api.Message;
    4. import io.openmessaging.api.MessagingAccessPoint;
    5. import io.openmessaging.api.OMS;
    6. import io.openmessaging.api.OMSBuiltinKeys;
    7. import io.openmessaging.api.transaction.LocalTransactionChecker;
    8. import io.openmessaging.api.transaction.LocalTransactionExecuter;
    9. import io.openmessaging.api.transaction.TransactionProducer;
    10. import io.openmessaging.api.transaction.TransactionStatus;
    11. public class TransactionProducerTest {
    12. public static void main(String... args) {
    13. Properties credentials = new Properties();
    14. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    15. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
    16. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    17. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
    18. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
    19. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build();
    20. Properties properties = new Properties();
    21. // 设置用户实例,进入控制台的概览页面查看接入点配置
    22. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
    23. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    24. TransactionProducer producer = accessPoint.createTransactionProducer(properties, new LocalTransactionChecker() {
    25. @Override
    26. public TransactionStatus check(Message msg) {
    27. // check business commit status
    28. return TransactionStatus.CommitTransaction;
    29. }
    30. });
    31. producer.start();
    32. Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
    33. producer.send(message, new LocalTransactionExecuter() {
    34. @Override
    35. public TransactionStatus execute(Message msg, Object arg) {
    36. // if business success, then commit; else rollback
    37. return TransactionStatus.CommitTransaction;
    38. }
    39. }, null);
    40. }
    41. }
  2. 提交事务消息状态。当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
    • 执行本地事务完成后提交。
    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
      事务状态有以下三种:
    • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
    • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
    • TransactionStatus.Unknow 无法判断状态,期待消息队列的 Broker 向发送方再次询问该消息对应的本地事务的状态。

事务回查机制说明

  • 发送事务消息为什么必须要实现回查 Check 机制?
    当步骤 1 中半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?
    事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
    2. 向 Broker 提交该半事务消息本地事务的状态。

订阅事务消息

事务消息的订阅与普通消息订阅一致,详情请参见 订阅消息