功能介绍
在实际业务场景中,为了提升大任务的执行效率,往往需要将一个大任务拆分成数千个子任务。为了保证这数千个子任务的同时运行,需要调度数万核的CPU资源。叠加多任务需要竞争资源,一般IDC离线任务难以满足需求。
例如,在自动驾驶仿真任务场景下,修改算法后进行回归测试时需要对所有驾驶场景进行仿真,每个驾驶场景为一个子任务运行。为了提高迭代速度,所有子场景测试需要并行执行。
Fan-out和Fan-in常用于构建高效的并发处理流程,通过拆分(Fan-out)和聚合(Fan-in)操作,能够充分利用多核、多机资源,实现大规模数据的高效处理。
如上图所示,工作流编排过程中,可以使用DAG(有向无环图)编排Fan-out Fan-in任务。子任务的拆分方式分为有静态(静态DAG)和动态(动态DAG)。
如上图所示,在数据处理场景中,任务A可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务Bn处理。当所有子任务Bn运行结束后,在子任务C中聚合结果。具体启动多少个子任务B取决于任务A的输出结果,您可以在任务A中自定义子任务的拆分规则。
操作步骤
本文将通过示例应用介绍具体操作流程:构建一个动态DAG Fan-out Fan-in工作流,读取阿里云OSS对象存储中的一个大日志文件,将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。
将示例文件上传到PV对应的OSS路径下。
使用以下YAML创建一个工作流。具体操作,请参见创建工作流。
展开查看YAML示例
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dynamic-dag-map-reduce-
spec:
entrypoint: main
# claim a OSS PVC, workflow can read/write file in OSS through PVC.
volumes:
- name: workdir
persistentVolumeClaim:
claimName: pvc-oss
# how many tasks to split, default is 5.
arguments:
parameters:
- name: numParts
value: "5"
templates:
- name: main
# DAG definition.
dag:
tasks:
# split log files to several small files, based on numParts.
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
# multiple map task to count words in each small file.
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
depends: "split"
# run as a loop, partId from split task json outputs.
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
depends: "map"
# The `split` task split the big log file to several small files. Each file has a unique ID (partId).
# Finally, it dumps a list of partId to stdout as output parameters
- name: split
inputs:
parameters:
- name: numParts
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["split.py"]
env:
- name: NUM_PARTS
value: "{{inputs.parameters.numParts}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
# One `map` per partID is started. Finds its own "part file" and processes it.
- name: map
inputs:
parameters:
- name: partId
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["count.py"]
env:
- name: PART_ID
value: "{{inputs.parameters.partId}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
parameters:
- name: numParts
container:
image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
command: [python]
args: ["merge.py"]
env:
- name: NUM_PARTS
value: "{{inputs.parameters.numParts}}"
volumeMounts:
- name: workdir
mountPath: /mnt/vol
使用动态DAG方式实现Fan-out Fan-in编排任务。
大文件拆分为多个split子任务后,会在标准输出中输出一个JSON字符串,包含子任务要处理的partId,例如:
["0", "1", "2", "3", "4"]
map任务使用withParam
引用split任务的输出,并解析JSON字符串获得所有{{item}}
,并使用每个{{item}}
作为输入参数启动多个map任务。
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
depends: "split"
withParam: '{{tasks.split.outputs.result}}'
更多定义方式,请参见开源Argo Workflow。
工作流运行后,在Argo控制台查看任务DAG流程与运行结果。
在OSS Bucket的文件列表中,查看输出文件,其中log-count-data.txt为输入日志文件,split-output、count-output为中间结果目录、result.json为最终结果文件。
示例源代码请参见argo-workflow-examples。
联系我们
若您有任何产品建议或疑问,请加入钉钉群(钉钉群号:35688562)联系我们。