本文介绍迭代状态及其相关使用示例。
基本概念
迭代状态遍历输入中的某个数组类型参数,对于数组中的每个元素,并行执行其处理器(Processor)状态。迭代状态类似于编程语言中的foreach
,不同之处是这里的迭代是并行执行的。
执行并行迭代步骤会并发执行输入参数中的每个元素的处理器状态。当所有分支执行结束后,默认将输出一个包含所有分支结果的map[string]any,然后通过输出构造器将结果进行进一步处理。
迭代状态最大并发数限制为 100,具体能够支持的并发,与实际业务的IO Size有很大的关系。
迭代状态包含以下属性。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
Name | string | 是 | 状态名称。 | my-state-name |
Description | string | 否 | 状态描述。 | describe it here |
Type | string | 是 | 状态类型。 | Map |
InputConstructor | map[string]any | 否 | 输入构造器。 | 请参见输入和输出 |
ItemsPath | string | 是 | 用于提取输入数组的表达式。 | 请参见ItemsPath |
Processor | ItemProcessor | 是 | 迭代处理器。 | |
OutputConstructor | map[string]any | 否 | 输出构造器。 | 请参见状态输出构造器 |
Next | string | 否 | 当前状态的下一状态。当End取值为true时,无需指定。 | my-next-state |
End | bool | 否 | 是否为当前作用域的终结节点。 | true |
Retry | Retry | 否 | 重试配置 | 请参见 错误处理 |
Catch | Catch | 否 | 捕获配置 | 请参见 错误处理 |
MaxConcuccency | int | 否 | 并发配置 | 40 |
ItemConstructor | map[string]any | 否 | 子级输入构造 |
ItemsPath
该表达式执行后返回JSON Array,则可以进行迭代,将其中每个元素传入ItemProcessor进行处理;可使用表达式变量$Context和$Input,示例如下。
$Input.FieldA
ItemProcessor
字段 | 类型 | 是否可选 | 描述 | 示例值 |
States | array | 是 | 内部嵌套的状态数组。 | 详见示例一 |
StartAt | string | 是 | 内部嵌套状态数组的执行起点。 | my start task |
ItemConstructor
语法类似输入输出构造器,启用了新的保留字 $item
- Type: Map
Name: MapReduce
ItemsPath: $Input.formap
ItemConstructor:
origItem.$: $Item
outterFieldA.$: $Input.fieldA
staticValue: 123qwe
其中$Item代表每次迭代出的元素,$Input依然代表表达式所处上下文,即MapReduce的输入
在多种配置都存在的情形下,执行顺序为InputConstructor->ItemsPath->ItemConstructor.
使用示例
示例一
以下示例流程定义一个迭代状态,这个迭代步骤处理器包含一个通过状态。
Type: StateMachine
Name: my-wkfl
SpecVersion: v1
StartAt: Map1
States:
- Type: Map
Name: Map1
End: true
InputConstructor:
FieldA:
- a : b
- c : d
- e : f
ItemsPath: $Input.FieldA
Processor:
StartAt: Pass1
States:
- Type: Pass
Name: Pass1
End: true
流程的输入构造结果如下所示。
{
"FieldA": [
{
"a": "b"
},
{
"c": "d"
},
{
"e": "f"
}
]
}
针对Map,系统会对所有迭代Processor的输出结果进行合并,默认以关键字 "Items" 作为Key,合并后的结果作为Value;Map1
的输出结果如下所示, 自动添加此输出也是最后流程执行的输出。
{
"Items": [
{
"a": "b"
},
{
"c": "d"
},
{
"e": "f"
}
]
}
一个更加复杂的例子:
示例二
Type: Workflow
Name: my-wkfl
Description: test workflow definition
SpecVersion: V3
StartAt: MapReduce
States:
- Type: Map
Name: MapReduce
ItemsPath: $Input.formap
ItemConstructor:
origItem.$: $Item
outterFieldA.$: $Input.fieldA
staticValue: 123qwe
ItemBatcher:
MaxItemsPerBatch: 2
BatchInput:
apd: tt
End: true
Processor:
StartAt: Succeed
States:
- Type: Succeed
Name: Succeed
输入为:
{
"fieldA": {
"orig-fieldA-1": 1,
"orig-fieldA-2": "qwe1"
},
"formap": [
{
"k0": "v"
},
{
"k1": "v"
},
{
"k2": "v"
},
{
"k3": "v"
},
{
"k4": "v"
}
]
}
其中将输入进行2个一批的分批,每批附加了一个名为apd的数据。
另外对每批的输入提前进行了构造,内含 origItem,outterFieldA,staticValue三个属性。
最终获得这样的输出
{
"Items": [
{
"origItem": {
"BatchInput": {
"apd": "tt"
},
"Items": [
{
"k0": "v"
},
{
"k1": "v"
}
]
},
"outterFieldA": {
"orig-fieldA-1": 1,
"orig-fieldA-2": "qwe1"
},
"staticValue": "123qwe"
},
{
"origItem": {
"BatchInput": {
"apd": "tt"
},
"Items": [
{
"k2": "v"
},
{
"k3": "v"
}
]
},
"outterFieldA": {
"orig-fieldA-1": 1,
"orig-fieldA-2": "qwe1"
},
"staticValue": "123qwe"
},
{
"origItem": {
"BatchInput": {
"apd": "tt"
},
"Items": [
{
"k4": "v"
}
]
},
"outterFieldA": {
"orig-fieldA-1": 1,
"orig-fieldA-2": "qwe1"
},
"staticValue": "123qwe"
}
]
}