更新时间:2021-01-08 19:06
Saga 模式事务是基于 状态机引擎 来实现的。分布式事务控制台提供了一个状态设计器,您可以通过该设计器自编排设计状态图,定义各节点的详细信息与属性配置等。具体的实现机制如下:
本文详细介绍 状态机 与 状态 的各个属性,以便您进行业务流程的设计编排。
状态机的属性如下:
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
}
}
属性说明
参数 | 说明 |
---|---|
Name |
表示状态机的名称,必须唯一。 |
Comment |
状态机的描述。 |
Version |
状态机的版本。 |
StartState |
启动时运行的第一个 状态。 |
States |
状态列表,是一个 map 结构。key 是 状态 的名称,在状态机内必须唯一;value 是一个 map 结构,表示 状态 的属性列表。 |
"States": {
...
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"IsForUpdate": true,
"IsPersist": true,
"IsAsync": false,
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException" : "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Retry": [
{
"Exceptions": ["io.seata.saga.engine.mock.DemoException"],
"IntervalSeconds": 1.5,
"MaxAttempts": 3,
"BackoffRate": 1.5
},
{
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
],
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
}
...
}
属性说明
参数 | 说明 |
---|---|
ServiceName |
服务名称,通常是服务的 beanId。 |
ServiceMethod |
服务方法名称。 |
CompensateState |
该状态的补偿状态。 |
IsForUpdate |
标识该服务是否会更新数据,默认是 false。如果配置了 CompensateState ,则默认是 true。有补偿服务的服务肯定是数据更新类服务。 |
IsPersist |
执行日志是否进行存储,默认是 true。有一些查询类的服务可以配置为 false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行。 |
IsAsync |
异步调用服务。因为异步调用服务会忽略服务的返回结果,所以用户定义的服务执行状态映射(下文的 status 属性)将被忽略,默认为服务调用成功。如果提交异步调用失败(比如线程池已满),则为服务执行状态失败。 |
Input |
调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, $. 表示使用表达式从状态机上下文中取参数,表达使用的 SpringEL。如果是常量,直接写值即可。关于复杂的参数如何传入,参见 复杂参数的 Input 定义。 |
Output |
将服务返回的参数赋值到状态机上下文中,是一个 map 结构。$. 是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root 表示服务的整个返回参数。 |
Status |
服务执行状态映射,框架定义了三个状态:SU 成功、FA 失败、UN 未知。需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构。$Exception{ 开头表示判断异常类型。 |
Catch |
捕获到异常后的路由。 |
Retry |
捕获异常后的重试策略,是个数组,可以配置多个规则。Exceptions 为匹配的的异常列表。IntervalSeconds 为重试间隔。MaxAttempts 为最大重试次数。BackoffRate 为下一次重试间隔相对于上一次重试间隔的倍数。例如,如果上次一重试间隔是 2 秒,而 BackoffRate=1.5 ,则下一次重试间隔是 3 秒。Exceptions 属性可以不配置。不配置时,表示框架自动匹配网络超时异常。当在重试过程中发生了别的异常,框架会重新匹配规则,并按新规则进行重试,同一种规则的总重试次数不会超过该规则的 MaxAttempts。 |
Next |
服务执行完成后下一个执行的状态。 |
当没有配置 Status 对服务执行状态进行映射,系统会自动判断状态:
FA
。IsForUpdate=true
,则状态为 UN
,否则为 FA
。整个状态机的执行状态是由框架自己判断的,状态机有两个状态:status(正向执行状态)与 compensateStatus(补偿状态)。
"ChoiceState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[reduceInventoryResult] == true",
"Next":"ReduceBalance"
}
],
"Default":"Fail"
}
Choice 类型的状态是单项选择路由。
Choices
:可选的分支列表,只会选择第一个条件成立的分支。Expression
:SpringEL 表达式。Next
:当 Expression 表达式成立时,执行的下一个状态。
"Succeed": {
"Type":"Succeed"
}
运行到 Succeed 状态,表示状态机正常结束。正常结束不代表成功结束,是否成功要看每个状态是否都成功。
"Fail": {
"Type":"Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
运行到 Fail 状态,表示状态机异常结束。异常结束时,可以配置 ErrorCode 和 Message,表示错误码和错误信息,可以用于给调用方返回错误码和消息。
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
}
CompensationTrigger
类型的 state 是用于触发补偿事件,回滚分布式事务。Next
表示补偿成功后路由到的 state。
"CallSubStateMachine": {
"Type": "SubStateMachine",
"StateMachineName": "simpleCompensationStateMachine",
"CompensateState": "CompensateSubMachine",
"Input": [
{
"a": "$.1",
"barThrowException": "$.[barThrowException]",
"fooThrowException": "$.[fooThrowException]",
"compensateFooThrowException": "$.[compensateFooThrowException]"
}
],
"Output": {
"fooResult": "$.#root"
},
"Next": "Succeed"
}
SubStateMachine
类型的“状态”是调用子状态机。
StateMachineName
:要调用的子状态机名称。CompensateState
:子状态机的补偿 state,可以不配置,系统会自动创建它的补偿 state。子状态机的补偿实际就是调用子状态机的 compensate 方法。所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时,可以利用 Input 属性自定义传入一些变量,参见下面的 CompensateSubMachine
。
"CompensateSubMachine": {
"Type": "CompensateSubMachine",
"Input": [
{
"compensateFooThrowException": "$.[compensateFooThrowException]"
}
]
}
CompensateSubMachine
类型的 state 是专门用于补偿一个子状态机的 state,它会调用子状态机的 compensate 方法,可以利用 Input 属性传入一些自定义的变量,Status 属性自定判断补偿是否成功。
"FirstState": {
"Type": "ServiceTask",
"ServiceName": "demoService",
"ServiceMethod": "complexParameterMethod",
"Next": "ChoiceState",
"ParameterTypes" : ["java.lang.String", "int", "io.seata.saga.engine.mock.DemoService$People", "[Lio.seata.saga.engine.mock.DemoService$People;", "java.util.List", "java.util.Map"],
"Input": [
"$.[people].name",
"$.[people].age",
{
"name": "$.[people].name",
"age": "$.[people].age",
"childrenArray": [
{
"name": "$.[people].name",
"age": "$.[people].age"
},
{
"name": "$.[people].name",
"age": "$.[people].age"
}
],
"childrenList": [
{
"name": "$.[people].name",
"age": "$.[people].age"
},
{
"name": "$.[people].name",
"age": "$.[people].age"
}
],
"childrenMap": {
"lilei": {
"name": "$.[people].name",
"age": "$.[people].age"
}
}
},
[
{
"name": "$.[people].name",
"age": "$.[people].age"
},
{
"name": "$.[people].name",
"age": "$.[people].age"
}
],
[
{
"@type": "io.seata.saga.engine.mock.DemoService$People",
"name": "$.[people].name",
"age": "$.[people].age"
}
],
{
"lilei": {
"@type": "io.seata.saga.engine.mock.DemoService$People",
"name": "$.[people].name",
"age": "$.[people].age"
}
}
],
"Output": {
"complexParameterMethodResult": "$.#root"
}
}
上述的 complexParameterMethod
方法定义如下:
People complexParameterMethod(String name, int age, People people, People[] peopleArrya, List<People> peopleList, Map<String, People> peopleMap)
class People {
private String name;
private int age;
private People[] childrenArray;
private List<People> childrenList;
private Map<String, People> childrenMap;
...
}
启动状态机时,需传入参数:
Map<String, Object> paramMap = new HashMap<>(1);
People people = new People();
people.setName("lilei");
people.setAge(18);
paramMap.put("people", people);
String stateMachineName = "simpleStateMachineWithComplexParams";
StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);
ParameterTypes
属性是可以不用传的,调用的方法的参数列表中有 Map、List 这种可以带泛型的集合类型。因为 Java 编译会丢失泛型,所以需要使用这个属性,同时在 Input 的 JSON 中对应的对这个 JSON 加 @type
来申明泛型(集合的元素类型)。
在文档使用中是否遇到以下问题
更多建议
匿名提交