全部产品

Saga 模式事务开发

更新时间:2020-11-12 10:34:22

在 Saga 模式的业务流程中,每个参与者都会提交本地事务。当某一个参与者失败,则补偿之前已经成功的参与者。一阶段正向服务和二阶段补偿服务均由业务开发实现。

本文将基于 Saga 模式的示例工程 引导您快速体验 Saga 模式的分布式应用事务开发。

  1. 依赖配置:在本地工程项目中,添加分布式事务依赖与相关配置项等。
  2. 参与者开发:进行本地事务参与者开发,实现相应的方法与接口。
  3. 业务开发:设计编排实际业务流程,通过状态机引擎启动该业务。

依赖与配置项

引入依赖

基于 SOFABoot 框架

SOFABoot 框架中已经默认集成了分布式事务 SDK,不需要手动添加分布式事务的 maven 依赖。您只需确保在工程的主 pom.xml 文件中添加以下依赖配置:

  1. <parent>
  2. <groupId>com.alipay.sofa</groupId>
  3. <artifactId>sofaboot-enterprise-dependencies</artifactId>
  4. <version>3.3.2</version>
  5. </parent>
说明:SOFABoot 的最新版本信息,参见 SOFABoot 版本说明

基于 Spring 框架

在 Spring 框架中,您需要依次添加 Spring 依赖和分布式事务依赖。

  1. 在应用中添加如下 Spring Maven 依赖:
    1. <!-- spring boot and spring dependencies begin-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-autoconfigure</artifactId>
    5. </dependency>
    6. <!-- logback -->
    7. <dependency>
    8. <groupId>org.springframework.boot</groupId>
    9. <artifactId>spring-boot-starter-logging</artifactId>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.springframework.boot</groupId>
    13. <artifactId>spring-boot</artifactId>
    14. </dependency>
    15. <!-- test -->
    16. <dependency>
    17. <groupId>org.springframework.boot</groupId>
    18. <artifactId>spring-boot-starter-test</artifactId>
    19. <scope>test</scope>
    20. </dependency>
    21. <!-- spring dependencies end -->
    说明:Spring 使用默认版本即可,也可自定义需要的版本,如 2.1.13.RELEASE
  2. 在工程中添加如下分布式事务依赖:
    1. <dependency>
    2. <groupId>com.alipay.dtx</groupId>
    3. <artifactId>dtx-sofaboot</artifactId>
    4. <version>${dtx.version}</version>
    5. </dependency>
    说明${dtx.version} 可自定义需要的版本或最新版本,如 2.3.0 版本。

配置工程属性

在 SOFAStack 云上环境中运行,需要将工程 application.properties 文件中 instanceidantvip.endpoint 等修改为对应环境的配置。详见 引入 SOFA 中间件 > properties 配置项

参与者开发

完成依赖与配置项操作后,即可进行参与者应用开发。此处提供两个参与者示例,分别是转账交易中的扣钱与加钱参与者。您可以根据实际业务,自定义正向和补偿操作方法名。

扣钱参与者

  1. 实现扣钱方法和失败补偿方法,示例如下:

    1. /**
    2. * Saga扣钱参与者
    3. */
    4. public interface FirstSagaAction {
    5. /**
    6. * 扣钱操作
    7. * @param businessKey
    8. * @param accountNo
    9. * @param amount
    10. * @return
    11. */
    12. boolean amountMinus(String businessKey, String accountNo, BigDecimal amount, Map<String, Object> extParams);
    13. /**
    14. * 补偿(冲正)扣钱操作
    15. * @param businessKey
    16. * @param accountNo
    17. * @return
    18. */
    19. boolean compensateAmountMinus(String businessKey, String accountNo);
    20. }
  2. 实现接口,示例如下:

    1. public class FirstSagaActionImpl implements FirstSagaAction {
    2. protected final static Logger logger = LoggerFactory.getLogger(FirstSagaActionImpl.class);
    3. //账户dao
    4. private AccountDAO firstAccountDAO;
    5. //业务流水dao
    6. private AccountTransactionDAO firstAccountTransactionDAO;
    7. //事务模版
    8. private TransactionTemplate firstActionTransactionTemplate;
    9. @Override
    10. public boolean amountMinus(final String businessKey, final String accountNo, final BigDecimal amount, final Map<String, Object> extParams) {
    11. try {
    12. if (amount.compareTo(BigDecimal.ZERO) < 0) {
    13. throw new RuntimeException("金额必须大小0");
    14. }
    15. AccountTransaction accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
    16. if (accountTransaction != null && Status.SUCCESS.name().equals(accountTransaction.getStatus())) {
    17. //幂等控制: 交易已成功,直接返回成功
    18. return true;
    19. }
    20. return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
    21. @Override
    22. public Boolean doInTransaction(TransactionStatus status) {
    23. try {
    24. //校验账户余额
    25. Account account = firstAccountDAO.getAccountForUpdate(accountNo);
    26. if (account.getAmount().compareTo(amount) < 0) {
    27. throw new RuntimeException("余额不足");
    28. }
    29. //记录账户操作流水
    30. AccountTransaction accountTransaction = new AccountTransaction();
    31. accountTransaction.setBusinessKey(businessKey);
    32. accountTransaction.setAccountNo(accountNo);
    33. accountTransaction.setAmount(amount);
    34. accountTransaction.setType(Type.MINUS.name());
    35. accountTransaction.setStatus(Status.SUCCESS.name());
    36. firstAccountTransactionDAO.addTransaction(accountTransaction);
    37. //扣钱
    38. BigDecimal amount = account.getAmount().subtract(accountTransaction.getAmount());
    39. if (amount.compareTo(BigDecimal.ZERO) < 0) {
    40. throw new RuntimeException("余额不足");
    41. }
    42. account.setAmount(amount);
    43. firstAccountDAO.updateAmount(account);
    44. } catch (Exception e) {
    45. logger.error("扣钱操作失败", e);
    46. throw new RuntimeException("扣钱操作失败", e);
    47. }
    48. return true;
    49. }
    50. });
    51. } catch (Exception e) {
    52. logger.error("扣钱操作失败", e);
    53. return false;
    54. }
    55. }
    56. @Override
    57. public boolean compensateAmountMinus(final String businessKey, final String accountNo) {
    58. AccountTransaction accountTransaction;
    59. try {
    60. accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
    61. if (accountTransaction == null) {
    62. //原交易流水不存在, 记录防悬挂(后发先至)流水
    63. accountTransaction = new AccountTransaction();
    64. accountTransaction.setBusinessKey(businessKey);
    65. accountTransaction.setAccountNo(accountNo);
    66. accountTransaction.setType(Type.MINUS.name());
    67. accountTransaction.setStatus(Status.COMPENSATED.name());
    68. firstAccountTransactionDAO.addTransaction(accountTransaction);
    69. return true;
    70. }
    71. if (Status.COMPENSATED.name().equals(accountTransaction.getStatus())) {
    72. //幂等控制: 补偿已成功,直接返回成功
    73. return true;
    74. }
    75. final AccountTransaction accountTransactionFinal = accountTransaction;
    76. return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
    77. @Override
    78. public Boolean doInTransaction(TransactionStatus status) {
    79. try {
    80. //补偿已扣减金额
    81. Account account = firstAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
    82. BigDecimal amount = account.getAmount().add(accountTransactionFinal.getAmount());
    83. account.setAmount(amount);
    84. firstAccountDAO.updateAmount(account);
    85. //更新流水状态
    86. accountTransactionFinal.setStatus(Status.COMPENSATED.name());
    87. firstAccountTransactionDAO.updateTransaction(accountTransactionFinal);
    88. return true;
    89. } catch (Exception e) {
    90. logger.error("扣钱操作补偿失败", e);
    91. status.setRollbackOnly();
    92. return false;
    93. }
    94. }
    95. });
    96. } catch (SQLException e) {
    97. logger.error("扣钱操作补偿失败", e);
    98. return false;
    99. }
    100. }
    101. public void setFirstAccountDAO(AccountDAO firstAccountDAO) {
    102. this.firstAccountDAO = firstAccountDAO;
    103. }
    104. public void setFirstAccountTransactionDAO(AccountTransactionDAO firstAccountTransactionDAO) {
    105. this.firstAccountTransactionDAO = firstAccountTransactionDAO;
    106. }
    107. public void setFirstActionTransactionTemplate(TransactionTemplate firstActionTransactionTemplate) {
    108. this.firstActionTransactionTemplate = firstActionTransactionTemplate;
    109. }
    110. }

加钱参与者

  1. 实现正向加钱方法和失败补偿方法,示例如下:

    1. /**
    2. * Saga加钱 参与者
    3. */
    4. public interface SecondSagaAction {
    5. /**
    6. * 加钱操作
    7. * @param businessKey
    8. * @param accountNo
    9. * @param amount
    10. * @return
    11. */
    12. boolean amountAdd(String businessKey, String accountNo, BigDecimal amount, Map<String, Object> extParams);
    13. /**
    14. * 补偿(冲正)加钱操作
    15. * @param businessKey
    16. * @param accountNo
    17. * @return
    18. */
    19. boolean compensateAmountAdd(String businessKey, String accountNo);
    20. }
  2. 实现接口,示例如下:

    1. public class SecondSagaActionImpl implements SecondSagaAction {
    2. protected final static Logger logger = LoggerFactory.getLogger(SecondSagaActionImpl.class);
    3. private AccountDAO secondAccountDAO;
    4. private AccountTransactionDAO secondAccountTransactionDAO;
    5. private TransactionTemplate secondActionTransactionTemplate;
    6. @Override
    7. public boolean amountAdd(final String businessKey, final String accountNo, final BigDecimal amount, final Map<String, Object> extParams) {
    8. try {
    9. if (amount.compareTo(BigDecimal.ZERO) < 0) {
    10. throw new RuntimeException("金额必须大于0");
    11. }
    12. AccountTransaction accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey);
    13. if (accountTransaction != null && Status.SUCCESS.name().equals(accountTransaction.getStatus())) {
    14. //幂等控制: 交易已成功,直接返回成功
    15. return true;
    16. }
    17. return secondActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
    18. @Override
    19. public Boolean doInTransaction(TransactionStatus status) {
    20. try {
    21. //记录账户操作流水
    22. AccountTransaction accountTransaction = new AccountTransaction();
    23. accountTransaction.setBusinessKey(businessKey);
    24. accountTransaction.setAccountNo(accountNo);
    25. accountTransaction.setAmount(amount);
    26. accountTransaction.setType(Type.ADD.name());
    27. accountTransaction.setStatus(Status.SUCCESS.name());
    28. secondAccountTransactionDAO.addTransaction(accountTransaction);
    29. Account account = secondAccountDAO.getAccountForUpdate(accountNo);
    30. //加钱
    31. BigDecimal amount = account.getAmount().add(accountTransaction.getAmount());
    32. account.setAmount(amount);
    33. secondAccountDAO.updateAmount(account);
    34. } catch (Exception e) {
    35. logger.error("加钱操作失败", e);
    36. throw new RuntimeException("加钱操作失败", e);
    37. }
    38. return true;
    39. }
    40. });
    41. } catch (Exception e) {
    42. logger.error("加钱操作失败", e);
    43. return false;
    44. }
    45. }
    46. @Override
    47. public boolean compensateAmountAdd(final String businessKey, final String accountNo) {
    48. AccountTransaction accountTransaction;
    49. try {
    50. accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey);
    51. if (accountTransaction == null) {
    52. //防悬挂: 原交流流水不存在, 记录防悬挂(后发先至)流水
    53. accountTransaction = new AccountTransaction();
    54. accountTransaction.setBusinessKey(businessKey);
    55. accountTransaction.setAccountNo(accountNo);
    56. accountTransaction.setType(Type.ADD.name());
    57. accountTransaction.setStatus(Status.COMPENSATED.name());
    58. secondAccountTransactionDAO.addTransaction(accountTransaction);
    59. //允许空补偿: 返回补偿成功
    60. return true;
    61. }
    62. if (Status.COMPENSATED.name().equals(accountTransaction.getStatus())) {
    63. //幂等控制: 补偿已成功,直接返回成功
    64. return true;
    65. }
    66. final AccountTransaction accountTransactionFinal = accountTransaction;
    67. return secondActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
    68. @Override
    69. public Boolean doInTransaction(TransactionStatus status) {
    70. try {
    71. //扣回已加金额
    72. Account account = secondAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
    73. BigDecimal amount = account.getAmount().subtract(accountTransactionFinal.getAmount());
    74. if(amount.compareTo(BigDecimal.ZERO) < 0){
    75. throw new RuntimeException("余额不足, 请工人核对");
    76. }
    77. account.setAmount(amount);
    78. secondAccountDAO.updateAmount(account);
    79. //更新流水状态
    80. accountTransactionFinal.setStatus(Status.COMPENSATED.name());
    81. secondAccountTransactionDAO.updateTransaction(accountTransactionFinal);
    82. return true;
    83. } catch (Exception e) {
    84. logger.error("加钱操作补偿失败", e);
    85. status.setRollbackOnly();
    86. return false;
    87. }
    88. }
    89. });
    90. } catch (SQLException e) {
    91. logger.error("加钱操作补偿失败", e);
    92. return false;
    93. }
    94. }
    95. public void setSecondAccountDAO(AccountDAO secondAccountDAO) {
    96. this.secondAccountDAO = secondAccountDAO;
    97. }
    98. public void setSecondAccountTransactionDAO(AccountTransactionDAO secondAccountTransactionDAO) {
    99. this.secondAccountTransactionDAO = secondAccountTransactionDAO;
    100. }
    101. public void setSecondActionTransactionTemplate(TransactionTemplate secondActionTransactionTemplate) {
    102. this.secondActionTransactionTemplate = secondActionTransactionTemplate;
    103. }
    104. }

业务开发

完成参与者开发后,您需要通过分布式事务控制台提供的状态设计器进行业务服务编排,如编排成转账交易服务。

操作步骤如下:

  1. 进入分布式事务控制台页面,在左侧导航栏中选择 应用事务 > 事务配置
  2. 在当前页面中,点击 创建应用事务 > Saga 模式
  3. 创建 Saga 事务 页面,根据提示配置以下信息:
    • 应用名称:可从下拉菜单中选择现有应用或新建一个应用。
    • BizType:填写要创建的 Saga 事务的业务流程名称。
    • 描述:对该事务的备注信息。
  4. 点击 下一步,进入设计器页面,开始编排业务流程。
    1. 以便您快速体验,点击此处直接下载一个已经编排好的 JSON 文件
    2. 点击编辑区上方的 JSON View 图标,并将下载的 JSON 语句粘贴到设计器的 JSON View 框中。
    3. 点击 Designer View 图标,切换回设计器模式,即可查看具体的业务流程图。
  5. 将刚刚设计器生成的 JSON 保存到工程中。如在示例工程将其保存至 account-demo-saga/account-demo-service/src/main/resources/statelang/transfer.json
  6. 在工程中配置状态机引擎,参见示例工程文件 account-demo-saga/account-demo-service/src/main/resources/META-INF/spring/transfer-bean.xml

    1. <bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
    2. <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
    3. </bean>
    4. <bean id="dbStateMachineConfig" class="com.alipay.dtx.client.saga.config.DtxSagaStateMachineConfig">
    5. <property name="dataSource" ref="transferDataSource"></property>
    6. <property name="resources" value="classpath:statelang/*.json"></property>
    7. <property name="enableAsync" value="true"></property>
    8. <property name="threadPoolExecutor" ref="threadExecutor"></property>
    9. </bean>
    10. <bean id="threadExecutor"
    11. class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
    12. <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
    13. <property name="corePoolSize" value="20" />
    14. <property name="maxPoolSize" value="20" />
    15. <property name="queueCapacity" value="100" />
    16. <property name="rejectedExecutionHandler" ref="callerRunsPolicy" />
    17. </bean>
    18. <bean name="callerRunsPolicy" class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy">
    19. </bean>
    20. <bean class="io.seata.saga.rm.StateMachineEngineHolder">
    21. <property name="stateMachineEngine" ref="stateMachineEngine"/>
    22. </bean>
    说明:本文的示例工程中是用的 h2 数据源,且自动建表。Saga 事务日志建表语句,可参考 Saga 模式建表语句示例
  7. 在 Java 代码中,通过状态机引擎启动业务流程:

    1. /**
    2. * 转账交易
    3. */
    4. public interface TransferService {
    5. /**
    6. * Saga转账
    7. *
    8. * @param from
    9. * @param to
    10. * @param amount
    11. * @return
    12. */
    13. boolean transferBySaga(final String from, final String to, final BigDecimal amount, Map<String, Object> extParams);
    14. }

    对应的接口实现代码示例如下:

    1. public class TransferServiceImpl implements TransferService {
    2. protected final static Logger logger = LoggerFactory.getLogger(TransferServiceImpl.class);
    3. @Autowired
    4. private StateMachineEngine stateMachineEngine;
    5. @Override
    6. public boolean transferBySaga(String from, String to, BigDecimal amount, Map<String, Object> extParams) {
    7. try {
    8. String businessKey = UUID.randomUUID().toString().replaceAll("-", "");
    9. Map<String, Object> params = new HashMap<>(4);
    10. params.put("from", from);
    11. params.put("to", to);
    12. params.put("amount", amount);
    13. if(extParams != null){
    14. params.put("extParams", extParams);
    15. }
    16. StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("transferBySaga", null, businessKey, params);
    17. if (ExecutionStatus.SU.equals(inst.getStatus())
    18. && inst.getCompensationStatus() == null) {
    19. //正向状态为成功, 补偿状态为空(没为触发回滚),则交易成功
    20. return true;
    21. }
    22. else {
    23. //如果执行到了Fail节点会有errorCode和errorMessage
    24. String errorCode = (String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_CODE);
    25. String errorMessage = (String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_MSG);
    26. System.out.println("ErrorCode:" + errorCode + ", ErrorMsg:" + errorMessage + ", exception: "+ inst.getException());
    27. return false;
    28. }
    29. } catch (Throwable t) {
    30. logger.error("转账交易执行失败", t);
    31. throw new RuntimeException(t);
    32. }
    33. }
    34. public void setStateMachineEngine(StateMachineEngine stateMachineEngine) {
    35. this.stateMachineEngine = stateMachineEngine;
    36. }
    37. }

相关链接