文档

迭代(Map)

更新时间:

本文介绍迭代状态及其相关使用示例。

基本概念

迭代状态遍历输入中的某个数组类型参数,对于数组中的每个元素,并行执行其处理器(Processor)状态。迭代状态类似于编程语言中的foreach,不同之处是这里的迭代是并行执行的。

执行并行迭代步骤会并发执行输入参数中的每个元素的处理器状态。当所有分支执行结束后,默认将输出一个包含所有分支结果的map[string]any,然后通过输出构造器将结果进行进一步处理。

说明

迭代状态最大并发数限制为 100,具体能够支持的并发,与实际业务的IO Size有很大的关系。

迭代状态包含以下属性。

字段

类型

是否必选

描述

示例值

Name

string

状态名称。

my-state-name

Description

string

状态描述。

describe it here

Type

string

状态类型。

Map

InputConstructor

map[string]any

输入构造器。

请参见输入和输出

ItemsPath

string

用于提取输入数组的表达式。

请参见ItemsPath

Processor

ItemProcessor

迭代处理器。

请参见ItemProcessor

OutputConstructor

map[string]any

输出构造器。

请参见状态输出构造器

Next

string

当前状态的下一状态。当End取值为true时,无需指定。

my-next-state

End

bool

是否为当前作用域的终结节点。

true

Retry

Retry

重试配置

请参见 错误处理

Catch

Catch

捕获配置

请参见 错误处理

MaxConcuccency

int

并发配置

40

ItemConstructor

map[string]any

子级输入构造

请参见ItemConstructor

ItemsPath

该表达式执行后返回JSON Array,则可以进行迭代,将其中每个元素传入ItemProcessor进行处理;可使用表达式变量$Context和$Input,示例如下。

$Input.FieldA

ItemProcessor

字段

类型

是否可选

描述

示例值

States

array

内部嵌套的状态数组。

详见示例一

StartAt

string

内部嵌套状态数组的执行起点。

my start task

ItemConstructor

语法类似输入输出构造器,启用了新的保留字 $item

  - Type: Map
    Name: MapReduce
    ItemsPath: $Input.formap
    ItemConstructor:
      origItem.$: $Item
      outterFieldA.$: $Input.fieldA
      staticValue: 123qwe

其中$Item代表每次迭代出的元素,$Input依然代表表达式所处上下文,即MapReduce的输入

在多种配置都存在的情形下,执行顺序为InputConstructor->ItemsPath->ItemConstructor.

使用示例

示例一

以下示例流程定义一个迭代状态,这个迭代步骤处理器包含一个通过状态。

Type: StateMachine
Name: my-wkfl
SpecVersion: v1
StartAt: Map1
States:
  - Type: Map
    Name: Map1
    End: true
    InputConstructor:
      FieldA: 
        - a : b
        - c : d
        - e : f
    ItemsPath: $Input.FieldA
    Processor:
      StartAt: Pass1
      States:
        - Type: Pass
          Name: Pass1
          End: true

流程的输入构造结果如下所示。

{
    "FieldA": [
        {
            "a": "b"
        },
        {
            "c": "d"
        },
        {
            "e": "f"
        }
    ]
}

针对Map,系统会对所有迭代Processor的输出结果进行合并,默认以关键字 "Items" 作为Key,合并后的结果作为Value;Map1的输出结果如下所示, 自动添加此输出也是最后流程执行的输出。

{
    "Items": [
        {
            "a": "b"
        },
        {
            "c": "d"
        },
        {
            "e": "f"
        }
    ]
}

一个更加复杂的例子:

示例二

Type: Workflow
Name: my-wkfl
Description: test workflow definition
SpecVersion: V3
StartAt: MapReduce
States:
  - Type: Map
    Name: MapReduce
    ItemsPath: $Input.formap
    ItemConstructor:
      origItem.$: $Item
      outterFieldA.$: $Input.fieldA
      staticValue: 123qwe
    ItemBatcher:
      MaxItemsPerBatch: 2
      BatchInput: 
        apd: tt
    End: true
    Processor:
      StartAt: Succeed
      States:
        - Type: Succeed
          Name: Succeed

输入为:

{
    "fieldA": {
        "orig-fieldA-1": 1,
        "orig-fieldA-2": "qwe1"
    },
    "formap": [
        {
            "k0": "v"
        },
        {
            "k1": "v"
        },
        {
            "k2": "v"
        },
        {
            "k3": "v"
        },
        {
            "k4": "v"
        }
    ]
}

其中将输入进行2个一批的分批,每批附加了一个名为apd的数据。

另外对每批的输入提前进行了构造,内含 origItem,outterFieldA,staticValue三个属性。

最终获得这样的输出

{
    "Items": [
        {
            "origItem": {
                "BatchInput": {
                    "apd": "tt"
                },
                "Items": [
                    {
                        "k0": "v"
                    },
                    {
                        "k1": "v"
                    }
                ]
            },
            "outterFieldA": {
                "orig-fieldA-1": 1,
                "orig-fieldA-2": "qwe1"
            },
            "staticValue": "123qwe"
        },
        {
            "origItem": {
                "BatchInput": {
                    "apd": "tt"
                },
                "Items": [
                    {
                        "k2": "v"
                    },
                    {
                        "k3": "v"
                    }
                ]
            },
            "outterFieldA": {
                "orig-fieldA-1": 1,
                "orig-fieldA-2": "qwe1"
            },
            "staticValue": "123qwe"
        },
        {
            "origItem": {
                "BatchInput": {
                    "apd": "tt"
                },
                "Items": [
                    {
                        "k4": "v"
                    }
                ]
            },
            "outterFieldA": {
                "orig-fieldA-1": 1,
                "orig-fieldA-2": "qwe1"
            },
            "staticValue": "123qwe"
        }
    ]
}