Saga 状态机配置

Saga 模式事务是基于 状态机引擎 来实现的。分布式事务控制台提供了一个状态设计器,您可以通过该设计器自编排设计状态图,定义各节点的详细信息与属性配置等。具体的实现机制如下:

  • 通过状态图定义服务调用的流程,并生成 JSON 状态语言定义文件。
  • 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点。
  • JSON 状态定义文件由状态机引擎驱动执行。当出现异常时,状态引擎反向执行已成功节点对应的补偿节点将事务回滚。
说明:异常发生时,您也可自定义决定是否进行补偿。- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能。

本文详细介绍 状态机状态 的各个属性,以便您进行业务流程的设计编排。

状态机的属性

状态机的属性如下:

  1. {
  2. "Name": "reduceInventoryAndBalance",
  3. "Comment": "reduce inventory then reduce balance in a transaction",
  4. "StartState": "ReduceInventory",
  5. "Version": "0.0.1",
  6. "States": {
  7. }
  8. }

属性说明

参数 说明
Name 表示状态机的名称,必须唯一。
Comment 状态机的描述。
Version 状态机的版本。
StartState 启动时运行的第一个 状态
States 状态列表,是一个 map 结构。key 是 状态 的名称,在状态机内必须唯一;value 是一个 map 结构,表示 状态 的属性列表。

状态的属性

ServiceTask

  1. "States": {
  2. ...
  3. "ReduceBalance": {
  4. "Type": "ServiceTask",
  5. "ServiceName": "balanceAction",
  6. "ServiceMethod": "reduce",
  7. "CompensateState": "CompensateReduceBalance",
  8. "IsForUpdate": true,
  9. "IsPersist": true,
  10. "IsAsync": false,
  11. "Input": [
  12. "$.[businessKey]",
  13. "$.[amount]",
  14. {
  15. "throwException" : "$.[mockReduceBalanceFail]"
  16. }
  17. ],
  18. "Output": {
  19. "compensateReduceBalanceResult": "$.#root"
  20. },
  21. "Status": {
  22. "#root == true": "SU",
  23. "#root == false": "FA",
  24. "$Exception{java.lang.Throwable}": "UN"
  25. },
  26. "Retry": [
  27. {
  28. "Exceptions": ["io.seata.saga.engine.mock.DemoException"],
  29. "IntervalSeconds": 1.5,
  30. "MaxAttempts": 3,
  31. "BackoffRate": 1.5
  32. },
  33. {
  34. "IntervalSeconds": 1,
  35. "MaxAttempts": 3,
  36. "BackoffRate": 1.5
  37. }
  38. ],
  39. "Catch": [
  40. {
  41. "Exceptions": [
  42. "java.lang.Throwable"
  43. ],
  44. "Next": "CompensationTrigger"
  45. }
  46. ],
  47. "Next": "Succeed"
  48. }
  49. ...
  50. }

属性说明

参数 说明
ServiceName 服务名称,通常是服务的 beanId。
ServiceMethod 服务方法名称。
CompensateState 该状态的补偿状态。
IsForUpdate 标识该服务是否会更新数据,默认是 false。如果配置了 CompensateState,则默认是 true。有补偿服务的服务肯定是数据更新类服务。
IsPersist 执行日志是否进行存储,默认是 true。有一些查询类的服务可以配置为 false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行。
IsAsync 异步调用服务。因为异步调用服务会忽略服务的返回结果,所以用户定义的服务执行状态映射(下文的 status 属性)将被忽略,默认为服务调用成功。如果提交异步调用失败(比如线程池已满),则为服务执行状态失败。
Input 调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, $. 表示使用表达式从状态机上下文中取参数,表达使用的 SpringEL。如果是常量,直接写值即可。关于复杂的参数如何传入,参见 复杂参数的 Input 定义
Output 将服务返回的参数赋值到状态机上下文中,是一个 map 结构。
  • key 为放入到状态机上文时的 key(状态机上下文也是一个 map)。
  • value 中 $. 是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数。
  • Status 服务执行状态映射,框架定义了三个状态:SU 成功、FA 失败、UN 未知。需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构。
  • key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式,判断服务返回参数,带 $Exception{ 开头表示判断异常类型。
  • value 是当这个条件表达式成立时则将服务执行状态映射成这个值。
  • Catch 捕获到异常后的路由。
    Retry 捕获异常后的重试策略,是个数组,可以配置多个规则。
  • Exceptions 为匹配的的异常列表。
  • IntervalSeconds 为重试间隔。
  • MaxAttempts 为最大重试次数。
  • BackoffRate 为下一次重试间隔相对于上一次重试间隔的倍数。例如,如果上次一重试间隔是 2 秒,而 BackoffRate=1.5,则下一次重试间隔是 3 秒。
  • Exceptions 属性可以不配置。不配置时,表示框架自动匹配网络超时异常。当在重试过程中发生了别的异常,框架会重新匹配规则,并按新规则进行重试,同一种规则的总重试次数不会超过该规则的 MaxAttempts。
  • Next 服务执行完成后下一个执行的状态。

    当没有配置 Status 对服务执行状态进行映射,系统会自动判断状态:

    • 没有异常,则认为执行成功。
    • 如果有异常,则判断异常是不是网络连接超时,如果是则认为是 FA
    • 如果是其它异常,且服务 IsForUpdate=true,则状态为 UN,否则为 FA

    整个状态机的执行状态是由框架自己判断的,状态机有两个状态:status(正向执行状态)与 compensateStatus(补偿状态)。

    • 如果所有服务执行成功(事务提交成功),则 status=SU,compensateStatus=null。
    • 如果有服务执行失败,且存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=UN,compensateStatus=null。
    • 如果有服务执行失败,且不存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=FA,compensateStatus=null。
    • 如果补偿成功(事务回滚成功),则 status=FA/UN,compensateStatus=SU。
    • 发生补偿且有未补偿成功的服务(回滚失败),则 status=FA/UN,compensateStatus=UN。
    • 存在事务提交或回滚失败的情况,Seata Sever 都会不断发起重试。

    Choice

    1. "ChoiceState":{
    2. "Type": "Choice",
    3. "Choices":[
    4. {
    5. "Expression":"[reduceInventoryResult] == true",
    6. "Next":"ReduceBalance"
    7. }
    8. ],
    9. "Default":"Fail"
    10. }

    Choice 类型的状态是单项选择路由。

    • Choices:可选的分支列表,只会选择第一个条件成立的分支。
    • Expression:SpringEL 表达式。
    • Next:当 Expression 表达式成立时,执行的下一个状态。

    Succeed

    1. "Succeed": {
    2. "Type":"Succeed"
    3. }

    运行到 Succeed 状态,表示状态机正常结束。正常结束不代表成功结束,是否成功要看每个状态是否都成功。

    Fail

    1. "Fail": {
    2. "Type":"Fail",
    3. "ErrorCode": "PURCHASE_FAILED",
    4. "Message": "purchase failed"
    5. }

    运行到 Fail 状态,表示状态机异常结束。异常结束时,可以配置 ErrorCode 和 Message,表示错误码和错误信息,可以用于给调用方返回错误码和消息。

    CompensationTrigger

    1. "CompensationTrigger": {
    2. "Type": "CompensationTrigger",
    3. "Next": "Fail"
    4. }

    CompensationTrigger 类型的 state 是用于触发补偿事件,回滚分布式事务。Next 表示补偿成功后路由到的 state。

    SubStateMachine

    1. "CallSubStateMachine": {
    2. "Type": "SubStateMachine",
    3. "StateMachineName": "simpleCompensationStateMachine",
    4. "CompensateState": "CompensateSubMachine",
    5. "Input": [
    6. {
    7. "a": "$.1",
    8. "barThrowException": "$.[barThrowException]",
    9. "fooThrowException": "$.[fooThrowException]",
    10. "compensateFooThrowException": "$.[compensateFooThrowException]"
    11. }
    12. ],
    13. "Output": {
    14. "fooResult": "$.#root"
    15. },
    16. "Next": "Succeed"
    17. }

    SubStateMachine 类型的“状态”是调用子状态机。

    • StateMachineName:要调用的子状态机名称。
    • CompensateState:子状态机的补偿 state,可以不配置,系统会自动创建它的补偿 state。子状态机的补偿实际就是调用子状态机的 compensate 方法。所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时,可以利用 Input 属性自定义传入一些变量,参见下面的 CompensateSubMachine

    CompensateSubMachine

    1. "CompensateSubMachine": {
    2. "Type": "CompensateSubMachine",
    3. "Input": [
    4. {
    5. "compensateFooThrowException": "$.[compensateFooThrowException]"
    6. }
    7. ]
    8. }

    CompensateSubMachine 类型的 state 是专门用于补偿一个子状态机的 state,它会调用子状态机的 compensate 方法,可以利用 Input 属性传入一些自定义的变量,Status 属性自定判断补偿是否成功。

    复杂参数的 Input 定义

    1. "FirstState": {
    2. "Type": "ServiceTask",
    3. "ServiceName": "demoService",
    4. "ServiceMethod": "complexParameterMethod",
    5. "Next": "ChoiceState",
    6. "ParameterTypes" : ["java.lang.String", "int", "io.seata.saga.engine.mock.DemoService$People", "[Lio.seata.saga.engine.mock.DemoService$People;", "java.util.List", "java.util.Map"],
    7. "Input": [
    8. "$.[people].name",
    9. "$.[people].age",
    10. {
    11. "name": "$.[people].name",
    12. "age": "$.[people].age",
    13. "childrenArray": [
    14. {
    15. "name": "$.[people].name",
    16. "age": "$.[people].age"
    17. },
    18. {
    19. "name": "$.[people].name",
    20. "age": "$.[people].age"
    21. }
    22. ],
    23. "childrenList": [
    24. {
    25. "name": "$.[people].name",
    26. "age": "$.[people].age"
    27. },
    28. {
    29. "name": "$.[people].name",
    30. "age": "$.[people].age"
    31. }
    32. ],
    33. "childrenMap": {
    34. "lilei": {
    35. "name": "$.[people].name",
    36. "age": "$.[people].age"
    37. }
    38. }
    39. },
    40. [
    41. {
    42. "name": "$.[people].name",
    43. "age": "$.[people].age"
    44. },
    45. {
    46. "name": "$.[people].name",
    47. "age": "$.[people].age"
    48. }
    49. ],
    50. [
    51. {
    52. "@type": "io.seata.saga.engine.mock.DemoService$People",
    53. "name": "$.[people].name",
    54. "age": "$.[people].age"
    55. }
    56. ],
    57. {
    58. "lilei": {
    59. "@type": "io.seata.saga.engine.mock.DemoService$People",
    60. "name": "$.[people].name",
    61. "age": "$.[people].age"
    62. }
    63. }
    64. ],
    65. "Output": {
    66. "complexParameterMethodResult": "$.#root"
    67. }
    68. }

    上述的 complexParameterMethod 方法定义如下:

    1. People complexParameterMethod(String name, int age, People people, People[] peopleArrya, List<People> peopleList, Map<String, People> peopleMap)
    2. class People {
    3. private String name;
    4. private int age;
    5. private People[] childrenArray;
    6. private List<People> childrenList;
    7. private Map<String, People> childrenMap;
    8. ...
    9. }

    启动状态机时,需传入参数:

    1. Map<String, Object> paramMap = new HashMap<>(1);
    2. People people = new People();
    3. people.setName("lilei");
    4. people.setAge(18);
    5. paramMap.put("people", people);
    6. String stateMachineName = "simpleStateMachineWithComplexParams";
    7. StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);
    说明ParameterTypes 属性是可以不用传的,调用的方法的参数列表中有 Map、List 这种可以带泛型的集合类型。因为 Java 编译会丢失泛型,所以需要使用这个属性,同时在 Input 的 JSON 中对应的对这个 JSON 加 @type 来申明泛型(集合的元素类型)。