Python脚本V2

PAI-Designer提供自定义Python脚本的功能,您可以使用Python脚本V2组件自定义安装依赖包及运行自定义的Python函数。本文为您介绍Python脚本V2组件的配置方法及使用示例。

背景信息

Python脚本V2组件位于PAI-Designer组件的自定义算法组件文件夹下,当前版本为V2。

前提条件

  • 已完成DLC相关权限授权,授权方法详情请参见云产品依赖与授权:DLC

  • 由于Python脚本V2需要依赖于PAI-DLC作为底层计算资源,因此您需要在工作空间关联PAI-DLC计算资源,详情请参见添加PAI-DLC计算资源

  • 由于Python脚本V2依赖OSS作为代码存储环境,因此您需要先创建OSS Bucket,详情请参见创建存储空间

    重要

    创建的OSS Bucket必须与PAI-Designer和PAI-DLC在同一地域。

  • 已在工作空间中,为使用该组件的RAM账号添加算法开发角色,详情请参见管理成员。如果操作账号还需同时使用MaxCompute作为数据源,还需要同时添加MaxCompute开发角色。

可视化配置组件

  • 输入桩

    Python脚本V2组件共有4个输入端口,均可以连接OSS路径或MaxCompute表类型的数据。

    • OSS路径输入

      来自上游组件的OSS输入,会被挂载到Python脚本执行的节点上,系统会将挂载后的文件路径,以arguments的形式,传递给Python脚本,无需手工配置。arguments的规范如python main.py --input1 /ml/input/data/input1,代表第一个输入端口输入的OSS路径。在Python脚本中可以按照读本地文件的方式访问/ml/input/data/input1来读取挂载后的文件。

    • MaxCompute表输入

      MaxCompute表的输入不支持挂载,系统会将对应的表信息以URI的形式,作为arguments传递给Python脚本,无需手工配置。arguments的规范如python main.py --input1 odps://some-project-name/tables/table,代表第一个输入端口输入的MaxCompute表。对于ODPS URI形式的输入,您可以使用该组件代码模板内的parse_odps_url函数解析出对应的ProjectName、TableName和Partition等元信息,详情请参见使用示例

  • 输出桩

    Python脚本V2组件共有4个输出端口,其中输出端口1输出端口2是OSS路径输出端口,输出端口3输出端口4是MaxCompute表输出端口。

    • OSS路径输出

      该组件的脚本设置页签的任务输出路径参数配置的OSS路径,会被系统自动挂载到/ml/output/下。该组件的输出端口OSS输出-1OSS输出-2,分别对应子目录/ml/output/output1/ml/output/output2。在脚本中可以按照写本地文件的方式将需要传递给下游节点的文件写到这两个目录中。

    • MaxCompute表输出

      如果当前工作空间配置了MaxCompute项目,系统会自动传递一个临时表URI到Python脚本,例如: python main.py --output3 odps://<some-project-name>/tables/<output-table-name>,您可以通过PyODPS来创建临时表URI中指定的表,并将Python脚本处理完成的数据写出到这个表,最后通过组件连线将表传递给下游组件,详情可参考下文中的示例。

  • 组件参数

    页签

    参数

    描述

    脚本设置

    任务输出路径

    选择任务输出的OSS路径。

    • 配置好的OSS目录会挂载到作业容器的/ml/output/路径下,任务写出到/ml/output/路径下的数据,会被持久化保存到对应的OSS目录。

    • 组件的输出端口OSS输出-1OSS输出-2分别对应/ml/output/路径下的子路径output1和output2。当组件的OSS输出端口接入下游组件时,下游组件接收到的数据为对应子路径的数据。

    Python代码

    该组件的代码配置包括以下两部分:

    • 代码保存路径:选择代码保存的OSS路径,编辑框中写入的代码会保存在该OSS路径下。Python代码的文件名称默认为main.py。

      重要

      第一次单击保存前,请确认指定的代码保存路径下无同名文件,避免文件被覆盖。

    • Python代码编辑器:编辑框内默认提供示例代码,详情请参见使用示例。您可以直接在编辑器内编写代码。

    高级选项

    选中该参数后,支持配置以下两个参数:

    • 执行命令:在文本框中,输入您需要执行的命令,比如:python main.py

      说明

      系统会自动按照脚本名称和组件输入输出端口的连接情况来生成执行命令,无需手动配置。

    • 第三方依赖库:在文本框中,您可以通过添加脚本的方式安装第三方依赖库,格式与Python的requirement.txt相同,具体如下所示。节点执行前,会自动安装文本框中配置的第三方依赖库。

      cycler==0.10.0            # via matplotlib
      kiwisolver==1.2.0         # via matplotlib
      matplotlib==3.2.1
      numpy==1.18.5
      pandas==1.0.4
      pyparsing==2.4.7          # via matplotlib
      python-dateutil==2.8.1    # via matplotlib, pandas
      pytz==2020.1              # via pandas
      scipy==1.4.1              # via seaborn

    执行配置

    选择资源组

    支持选择DLC公共资源组或DLC专属资源组

    • 当选择公共资源组时,您需要配置选择机器实例类型参数,支持配置CPUGPU机器实例。默认为:ecs.c6.large。

    • 当选择专属资源组时,你需要配置任务所需的CPU(核数)内存(GB)共享内存(GB)GPU(卡数)

    默认为当前工作空间下的DLC云原生资源的默认资源组。

    专有网络配置

    支持选择已创建的专有网络进行挂载。

    安全组

    支持选择已创建的安全组进行挂载。

    高级选项

    选中该参数后,支持配置以下参数:

    • 选择机器实例个数:您可以按照实际需要配置机器实例个数,默认为1。

    • 选择作业镜像:默认为开源的xgboost1.6.0版本,如果您需要使用深度学习框架,则需要修改镜像。

    • 选择任务类型:仅当提交的代码按照分布式实现时,才需要修改该参数,支持以下取值:

      • XGBoost/LightGBM Job

      • TensorFlow Job

      • PyTorch Job

      • MPI Job

使用示例

默认示例代码解析

Python脚本V2组件默认提供的示例代码如下。

import os
import argparse
import json
"""
Python V2 组件示例代码
"""
# 当前工作空间下的默认MaxCompute执行环境,包含MaxCompute项目的名称以及Endpoint。
# 需要当前的工作空间下有MaxCompute项目时,作业的执行环境才会注入。
# 示例: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}。
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
def init_odps():    
    from odps import ODPS
    # 当前工作空间的默认MaxCompute项目信息。
    mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
    o = ODPS(
        access_id="<YourAccessKeyId>",
        secret_access_key="<YourAccessKeySecret>",
        # 请根据Project所在的Region选择,比如:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api。
        endpoint=mc_execution["endpoint"],
        project=mc_execution["odpsProject"],
    )
    return o
def parse_odps_url(table_uri):    
    from urllib import parse
    parsed = parse.urlparse(table_uri)
    project_name = parsed.hostname
    r = parsed.path.split("/", 2)
    table_name = r[2]
    if len(r) > 3:
        partition = r[3]
    else:
        partition = None
        return project_name, table_name, partition
def parse_args():
    parser = argparse.ArgumentParser(description="PythonV2 component script example.")
    parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
    parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
    parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
    parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
    parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
    parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
    parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
    parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
    args, _ = parser.parse_known_args()
    return args
def write_table_example(args):    
    # 示例:通过执行SQL语句复制PAI提供的公共表数据,作为当前组件输出端口3指定的临时表。
    output_table_uri = args.output3
    o = init_odps()
    project_name, table_name, partition = parse_odps_url(output_table_uri)
    o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
def write_output1(args):
    # 示例:将数据结果写入挂载的OSS路径(输出端口1的子目录),对应的结果可以通过连线传递到下游组件。
    output_path = args.output1
    os.makedirs(output_path, exist_ok=True)
    p = os.path.join(output_path, "result.text")
    with open(p, "w") as f:
        f.write("TestAccuracy=0.88")
if __name__ == "__main__":
    args = parse_args()
    print("Input1={}".format(args.input1))
    print("Output1={}".format(args.output1))
    # write_table_example(args)
    # write_output1(args)

常用函数说明:

  • init_odps():初始化一个ODPS实例,用来读取MaxCompute表数据,需要填写您的AccessKeyIdAccessKeySecret,关于如何获取AccessKey,详情请参见获取AccessKey

    更多关于操作MaxCompute表的API,详情请参见PyODPS

  • parse_odps_url(table_uri):解析输入的MaxCompute表的URI,返回解析完成得到的项目名称、表名和分区。table_uri格式为:odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/,比如:odps://test/tables/iris/pa=1/pb=1,其中pa=1/pb=1为一个多级分区。

  • parse_args():解析传入脚本的arguments,输入输出数据会以arguments的方式传递给执行的脚本。

使用示例1:Python脚本V2组件与其他组件串联使用

参考并修改心脏病预测案例模板,来说明Python脚本V2组件如何与Designer其他组件串联使用。组合使用工作流配置说明:

  1. 创建心脏病预测案例模板并进入工作流,具体操作请参见心脏病预测

  2. 将Python脚本V2组件拖入画布并重命名为SMOTE,并配置以下代码。

    重要

    在我们使用的镜像中没有imblearn库,需要在该组件的脚本设置页签的第三方依赖库中配置imblearn。在节点执行前,会自动安装该库。

    import argparse
    import json
    import os
    import numpy as np
    from odps.df import DataFrame
    from imblearn.over_sampling import SMOTE
    from urllib import parse
    from odps import ODPS
    ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
    def init_odps():
        from odps import ODPS
        # 当前工作空间的默认MaxCompute项目信息。
        mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
        o = ODPS(
            access_id="<替换成您自己的AccessKey>",
            secret_access_key="<替换成您自己的AccessKeySecret>",
            # 请根据Project所在的Region选择,比如:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api。
            endpoint=mc_execution["endpoint"],
            project=mc_execution["odpsProject"],
        )
        return o
    def get_max_compute_table(table_uri, odps):
        parsed = parse.urlparse(table_uri)
        project_name = parsed.hostname
        table_name = parsed.path.split('/')[2]
        table = odps.get_table(project_name + "." + table_name)
        return table
    def run():
        parser = argparse.ArgumentParser(description='PythonV2 component script example.')
        parser.add_argument(
            '--input1', type=str, default=None, help='Component input port 1.'
        )
        parser.add_argument(
            '--output3', type=str, default=None, help='Component input port 1.'
        )
        args, _ = parser.parse_known_args()
        print('Input1={}'.format(args.input1))
        print('output3={}'.format(args.output3))
        o = init_odps()
        imbalanced_table = get_max_compute_table(args.input1, o)
        df = DataFrame(imbalanced_table).to_pandas()
        sm = SMOTE(random_state = 2)
        X_train_res, y_train_res = sm.fit_resample(df, df['ifhealth'].ravel())
        new_table = o.create_table(get_max_compute_table(args.output3, o).name, imbalanced_table.schema , if_not_exists=True)
        with new_table.open_writer() as writer:
            writer.write(X_train_res.values.tolist())
    if __name__ == '__main__':
        run()

    其中access_idsecret_access_key需要配置您自己的AccessKey和AccessKeySecret。关于如何获取AccessKey,详情请参见获取AccessKey

  3. SMOTE组件接入拆分组件的下游,使用经典的SMOTE算法对拆分完得到的训练数据做过采样,对训练集里样本数量较少的类别进行过采样,合成新的样本来缓解类不平衡。

  4. SMOTE组件得到的新数据接入逻辑回归二分类组件做训练。

  5. 将训练得到的模型与左侧分支中的模型一样,连接相同的预测数据和评估组件做横向对比。组件运行成功后,单击可视化进入可视化页面,查看最终评估结果。评估结果额外做过采样并未对模型效果有特别明显的提升,说明原样本分布及模型效果都比较好。

使用示例2:使用Designer做纯DLC任务的编排

您可以在Designer中连接多个自定义Python脚本V2组件,来实现一组DLC任务的Pipeline编排和定时调度。以下图为例,按照Directed Acyclic Graph(DAG)图顺序启动4个DLC任务。

说明

如果DLC的执行代码不需要读取上游节点数据,也不需要给下游节点传递数据,则节点之间的连线只表示调度执行的依赖关系和先后顺序。

DAG图后续您可以将使用Designer开发完成的整个工作流,一键部署到DataWorks做定时调度,具体操作请参见使用DataWorks离线调度Designer工作流

阿里云首页 机器学习 相关技术圈