使用PythonSDK构建大规模Argo Workflows

Argo Workflows广泛应用于定时任务、机器学习和ETL数据处理等场景,但当对Kubernetes不太熟练时,YAML定义工作流程可能会增加学习难度。Hera Python SDK提供了一种简洁易用的替代方案,允许以Python代码构建工作流,支持复杂任务场景,易于测试,并与Python生态无缝集成。

功能介绍

Argo Workflows主要依赖YAML来定义工作流程,以实现配置的清晰与简洁。但当数据科学家不熟悉YAML时,在复杂的工作流设计中,YAML的严格缩进要求及层次化的结构可能会增加配置难度。

Hera是一个专为构建和提交Argo工作流程设计的Python SDK框架,旨在简化工作流程的构建和提交。在处理复杂工作流时,使用Hera可以有效避免YAML可能产生的语法错误。使用Hera PythonSDK还具有以下优势。

  • 代码简洁性:Hera提供了易于理解和编写的代码,可提升开发效率。

  • Python生态集成简单:每个Function就是一个Template,与Python生态中的各种框架无缝集成,提供了丰富的Python库和工具。

  • 可测试性:可直接利用Python的测试框架,有助于提高代码的质量和可维护性。

前提条件

  • 已安装Argo组件和控制台,并获取访问凭证和Argo Server 访问IP。具体操作,请参见启用批量任务编排能力

  • 已安装Hera。

    pip install hera-workflows

场景一:Simple DAG Diamond

Argo Workflows中,DAG(有向无环图)常用于定义复杂的任务依赖关系,其中Diamond结构是一种常见的工作流模式,可以实现多个任务并行执行后,并将结果汇聚到一个共同的后续任务。这种结构适用于需要合并不同数据流或处理结果的场景。以下展示如何使用Hera定义一个具有Diamond结构的工作流,其中两个任务taskAtaskB并行运行,它们的输出共同作为输入传递给taskC。

  1. 使用以下内容,创建simpleDAG.py。

    # 导入相关包。
    from hera.workflows import DAG, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # 配置访问地址和Token。
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # 填入之前获取的Token。
    global_config.verify_ssl = ""
    
    # 装饰器函数scriptHera实现近乎原生的Python函数编排的关键功能。
    # 它允许您在Hera上下文管理器(例如WorkflowSteps上下文)下调用该函数。
    # 该函数在任何Hera上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。
    # 该示例是打印输入的信息。
    @script()
    def echo(message: str):
        print(message)
    
    # 构建Workflow,WorkflowArgo中的主要资源,也是Hera的关键类,负责保存模板、设置入口点和运行模板。
    with Workflow(
        generate_name="dag-diamond-",
        entrypoint="diamond",
        namespace="argo",
    ) as w:
        with DAG(name="diamond"):
            A = echo(name="A", arguments={"message": "A"})  # 构建Template。
            B = echo(name="B", arguments={"message": "B"})
            C = echo(name="C", arguments={"message": "C"})
            D = echo(name="D", arguments={"message": "D"})
            A >> [B, C] >> D      # 构建依赖关系,B、C任务依赖A,D依赖BC。
    # 创建Workflow。
    w.create()
  2. 执行以下命令, 提交工作流。

    python simpleDAG.py
  3. 工作流运行后,在工作流控制台(Argo)查看任务DAG流程与运行结果。

    image

场景二:Map-Reduce

Argo Workflows中实现MapReduce风格的数据处理时,需要有效利用其DAG模板,以组织和协调多个任务,从而模拟MapReduce阶段。以下展示如何使用Hera构建一个简单的MapReduce工作流,用于处理文本文件的单词计数任务。每一步都是一个Python函数,便于和Python生态进行集成。

  1. 配置Artifacts,相关操作,请参见配置Artifacts

  2. 使用以下内容,创建map-reduce.py。

    展开查看代码内容

    from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    # 设置访问地址。
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # 填入之前获取的Token。
    global_config.verify_ssl = ""
    
    # 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=Parameter(name="num_parts"),
        outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
    )
    def split(num_parts: int) -> None:  # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号
        import json
        import os
        import sys
    
        os.mkdir("/mnt/out")
    
        part_ids = list(map(lambda x: str(x), range(num_parts)))
        for i, part_id in enumerate(part_ids, start=1):
            with open("/mnt/out/" + part_id + ".json", "w") as f:
                json.dump({"foo": i}, f)
        json.dump(part_ids, sys.stdout)
    
    # script中定义image、inputs、outputs
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
        outputs=OSSArtifact(
            name="part",
            path="/mnt/out/part.json",
            archive=NoneArchiveStrategy(),
            key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
        ),
    )
    def map_() -> None:  # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容
        import json
        import os
    
        os.mkdir("/mnt/out")
        with open("/mnt/in/part.json") as f:
            part = json.load(f)
        with open("/mnt/out/part.json", "w") as f:
            json.dump({"bar": part["foo"] * 2}, f)
    
    # script中定义image、inputs、outputs、resources
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
        outputs=OSSArtifact(
            name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
        ),
    )
    def reduce() -> None:   # 计算每个parts对应bar值的总和。
        import json
        import os
    
        os.mkdir("/mnt/out")
    
        total = 0
        for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
            result = json.load(f)
            total = total + result["bar"]
        with open("/mnt/out/total.json", "w") as f:
            json.dump({"total": total}, f)
    
    # 构建workflow,输入name、设置入口点、namespace、全局参数等。
    with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="argo", arguments=Parameter(name="num_parts", value="4")) as w:
        with DAG(name="main"):
            s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 构建Templetes。
            m = map_(
                with_param=s.result,
                arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
            )   # 输入参数并构建templetes。
            s >> m >> reduce()   # 构建任务依赖关系。
    # 创建工作流。
    w.create()
    
  3. 执行以下命令,提交工作流。

    python map-reduce.py
  4. 工作流运行后,您可以在工作流控制台(Argo)查看任务DAG流程与运行结果。image

相关文档

  • Hera相关文档。

    • 如果您需要详细了解Hera相关信息,请参见Hera概述

    • 若您想学习如何设置和使用Hera来进行LLM的训练过程,请参见Train LLM with Hera

  • YAML部署示例。

    • 如果您想了解以YAML的方式部署simple-diamond,请参见dag-diamond.yaml

    • 如果您想了解以YAML的方式部署map-reduce,请参见map-reduce.yaml

联系我们

若您有任何产品建议或疑问,请加入钉钉群(钉钉群号:35688562)联系我们。