PAI不仅支持通过可视化建模(PAI-Studio 2.0)拖拉拽的形式构建数据处理、数据验证及模型训练的机器学习工作流,而且支持使用Python SDK构建相同的工作流,并在PAI Pipeline Service中运行。本文介绍如何使用Python SDK构建一个机器学习工作流,并使其运行在PAI Pipeline Service中。

背景信息

为了获得合适的机器学习模型,将其应用于在线推理或离线推理,通常机器学习工程师需要进行数据提取、数据校验、特征工程、模型训练及模型评估全流程,且以上流程并非只进行一次。为了获得更符合业务需求的模型,机器学习工程师需要尝试不同的特征和参数,或使用增量的数据训练模型,以更好地拟合最新的输入特征。为了复用这些流程,通常的解决方案是先将这些流程以工作流(Pipeline)的形式组织起来,再提交至一个自动化工作流服务中运行。

PAI Pipeline Service是PAI提供的机器学习工作流服务,您可以通过可视化建模(PAI-Studio 2.0)拖拉拽的方式或Python SDK的方式构建一个机器学习工作流,并提交至PAI Pipeline Service运行。

准备工作

  1. 安装SDK。
    如果您通过本地的Python开发环境使用SDK,则需要安装SDK。如果您通过PAI-DSW环境使用SDK,该环境中已经预装了SDK,您可以跳过该步骤。通过pip命令安装SDK的命令如下。
    pip install https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.0-py2.py3-none-any.whl
    其中https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.0-py2.py3-none-any.whl表示SDK的下载地址,您无需修改。
    说明 虽然SDK支持Python 2和Python 3版本,但是Python社区已经停止维护Python 2版本,因此推荐您使用Python 3环境运行SDK。
  2. 初始化默认的SDK Session和AI工作空间。
    PAI Pipeline Service SDK依赖于阿里云机器学习PAI提供的服务,SDK的Session负责与PAI的后端服务和依赖的其他阿里云服务进行交互。Session封装了鉴权凭证AccessKey、使用PAI服务的地域及当前使用的AI工作空间。

    您可以通过pai.core.session.setup_default_session方法初始化一个全局默认的Session对象。当调用的API(例如SaveOperator.listWorkspace.list)需要与PAI后端服务进行通讯时,默认使用该Session进行通讯。

    初始化默认的SDK Session和AI工作空间时,您可以通过以下两种方式指定AI工作空间:
    • 方式一:首先在PAI控制台上查看AI工作空间的名称或ID,然后初始化默认Session的同时指定AI工作空间。代码示例如下。
      from pai.core.session import setup_default_session
      from pai.core.workspace import Workspace
      
      setup_default_session(access_key_id="<your_access_key_id>", 
                            access_key_secret="<your_access_key_secret>", 
                            region_id="<your_region>",
                            # workspace_id="<your_workspace_id>",         # AI工作空间名称和ID二选一。
                            workspace_name="<your_workspace_name>")
      您需要将下文表 1中的参数值替换为实际值。
    • 方式二:首先设置默认的Session,然后获取阿里云账号下可访问的AI工作空间列表,再指定使用的AI工作空间。代码示例如下。
      from pai.core.session import setup_default_session
      from pai.core.workspace import Workspace
      session = setup_default_session(access_key_id="<your_access_key_id>", access_key_secret="<your_access_key_secret>",
                                      region_id="<your_region>")
      for ws in Workspace.list():
          print(ws.name, ws.id)
      
      session.set_workspace(workspace=Workspace.get_by_name("<your_workspace_name>"))
      您需要将下文表 1中的参数值替换为实际值。
    表 1. 初始化的相关参数
    参数 描述
    <your_access_key_id> 阿里云账号的AccessKey ID。
    <your_access_key_secret> 阿里云账号的AccessKey Secret。
    <your_region> PAI Pipeline Service的地域,后续提交的任务将运行在该地域。该参数支持以下取值:
    • cn-shanghai:华东2(上海)
    • cn-hangzhou:华东1(杭州)
    • cn-beijing:华北2(北京)
    • cn-shenzhen:华南1(深圳)
    <your_workspace_name> AI工作空间名称。对于上面两种指定AI工作空间的方式,分别根据以下方法获取该参数值:
    • 如果使用方式一,则该参数与<your_workspace_id>二选一。您可以登录PAI控制台,在AI工作空间列表页面查看该参数值。您也可以创建新的AI工作空间,详情请参见AI工作空间
      说明 如果指定AI工作空间时,同时指定了AI工作空间的名称和ID,则接口报错。
    • 如果使用方式二,则将该参数指定为Workspace.list()接口返回的一个ws.name
    <your_workspace_id> AI工作空间ID。如果初始化默认Session的同时指定AI工作空间,则该参数与<your_workspace_name>二选一。您可以登录PAI控制台,在AI工作空间列表页面查看该参数值。您也可以创建新的AI工作空间,详情请参见AI工作空间
    说明 如果指定AI工作空间时,同时指定了AI工作空间的名称和ID,则接口报错。

获取算法组件

Operator是PAI Pipeline Service中定义的算法组件,包含了组件的输入输出信息、运行参数及具体执行的实现方式(执行一个DAG或执行一个单独的镜像)。您可以通过Python SDK从PAI Pipeliner Service获取PAI内置的公共算法组件(SavedOperator),也可以将本地构造的工作流对象保存为一个Operator进行复用(参见下文的构建工作流)。

通过Python SDK从PAI Pipeliner Service获取PAI内置的公共算法组件(SavedOperator)的详细步骤如下。

  1. 通过SavedOperator.list方法获取组件列表。
    SavedOperator.list方法中,PAI提供了一些公共的算法组件,通过指定providerProviderAlibabaPAI,即可获取PAI提供的算法组件列表。此外,您可以通过inputsoutputs属性,查看对应组件的输入输出信息,代码示例如下所示。
    from pai.operator import SavedOperator
    from pai.common import ProviderAlibabaPAI
    
    for op in SavedOperator.list(provider=ProviderAlibabaPAI):
        print(op.pipeline_id, op.identifier, op.provider, op.version)
    
    op = next(SavedOperator.list(provider=ProviderAlibabaPAI))
    # 查看组件的输入输出信息。
    print(op.inputs)
    print(op.outputs)
  2. 获取指定的算法组件。
    通过上一步的SavedOperator.list方法,可以获取算法组件的identifier-provider-versionpipeline_id信息,您可以使用二者中的任意一个从PAI Pipeline Service中获取一个唯一的算法组件。这两种方式的区别在于identifier-provider-version是由组件开发者在保存组件时指定的,而pipeline_id是由PAI Pipeline Service生成的组件唯一标识ID。以PAI提供的split组件为例,两种方式的代码示例分别如下:
    • 通过identifier-provider-version获取对应的组件
      from pai.common.utils import gen_temp_table
      op = SavedOperator.get_by_identifier(identifier="split", provider=ProviderAlibabaPAI, version="v1")
      print(op.inputs)

      目前PAI提供的Operator主要基于MaxCompute算法,底层依赖通过PAI命令实现。这些组件的identifier默认为 PAI命令的nameversionv1。您可以在PAI的常规机器学习组件帮助文档中查看组件的输入参数定义,详情请参见常规机器学习组件

    • 通过pipeline_id获取对应的组件
      from pai.common.utils import gen_temp_table
      op_by_id = SavedOperator.get(op.pipeline_id)
      print(op_by_id.identifier, op_by_id.provider, op_by_id.version)
      print(op_by_id.inputs)
  3. 提交运行任务。
    通过指定组件的必须输入参数,您可以提交一个单独的组件运行任务,以下以PAI提供的split算法组件为例,介绍如何使用SDK将输入数据表odps://pai_online_project/tables/wumai_data(该输入表为公开表,您可以直接使用)按照给定比例拆分到两张新表中。
    from pai.common.utils import gen_temp_table
    # split运行在MaxCompute中,需要指定运行的MaxCompute项目和执行环境。
    # maxc_execution作为算法组件的一个输入,标识算法组件的执行MaxCompute Project项目信息。
    maxc_execution = {
        "endpoint": "",
        "odpsProject": "<yourMaxComputeProject>",
    }
    #提交运行任务。
    pipeline_run = op.run(
        job_name="example-split-job",
        arguments={
            "execution":maxc_execution,
            "inputTable": "odps://pai_online_project/tables/wumai_data",
            "fraction": 0.7,  #split算法组件的输入参数,可以通过op.inputs获取输入信息。
            "output1TableName": gen_temp_table(), #gen_temp_table()接口获取随机表名。
            "output2TableName": gen_temp_table(),
        }
    )
    
    print(pipeline_run.get_outputs())
    上述代码中通过run接口提交任务后,SDK会在Console中输出任务在PAI控制台中的URL,您也可以直接在PAI控制台的任务管理页面,通过返回的运行任务ID或任务名称查找对应的任务实例。找到该任务后,进入任务详情页面,您可以查看任务执行的DAG、日志及输出,并且可以将模型直接部署到PAI-EAS。

    组件可以单独运行,也可以将多个组件拼接为一个工作流运行,详情请参见下文的构建工作流

构建工作流

PAI Pipeline Service支持将多个算法组件拼接为一个新的工作流,通过提供输入参数可以提交并运行该工作流,或将其保存为一个复合工作流组件。保存的工作流组件可以作为一个普通组件直接运行(参见上文的提交任务)或作为新创建的工作流的一个节点。

创建一个新的工作流主要包括以下流程:
  1. 定义工作流的输入信息。

    输入信息包括用户输入参数PipelineParameter或数据输入PipelineArtifact。对于数据输入,目前PAI Pipeline Service支持OSS数据、MaxCompute表数据及MaxCompute OfflineModel。

  2. 创建工作流中的Step及其输入。

    Step的输入可能来自于其他Step,也可能来源于当前创建的工作流的输入。

  3. 指定工作流的输出信息。

    工作流可以使用引用的方式将Step节点的输出作为新的工作流的输出。

下面的示例代码中,构建了一个简单的工作流,它先使用类型转换组件将输入表的部分字段转换为浮点类型,再通过数据拆分组件将表拆分为两张MaxCompute表。
from pai.pipeline.types import PipelineParameter, PipelineArtifact, MetadataBuilder
from pai.pipeline import PipelineStep, Pipeline
from pai.operator import SavedOperator
from pai.common.utils import gen_temp_table

def create_composite_pipeline():
    # 定义当前工作流的输入信息。
    
    # 由于组件底层的处理是运行在MaxCompute上,execution参数传递执行的MaxCompute Project信息。
    execution_input = PipelineParameter(name="execution", typ=dict)
    cols_to_double_input = PipelineParameter(name="cols_to_double")
    table_input = PipelineArtifact(name="input_table", metadata=MetadataBuilder.maxc_table())

    # 构建类型转换Step。
    # 指定identifier-provider-version, 使用一个已经保存的组件,作为工作流的一个Step。
    type_transform_step = PipelineStep(
        identifier="type_transform", provider=ProviderAlibabaPAI,
        version="v1", name="typeTransform", inputs={
            "inputTable": table_input, "execution": execution_input,
            "outputTable": gen_temp_table(), "cols_to_double": cols_to_double_input,
        }
    )
    
    # 构建拆分表Step。
    # SavedOperator也可以作为一个Step构建工作流。
    split_operator = SavedOperator.get_by_identifier(identifier="split",
     provider=ProviderAlibabaPAI, version="v1")
    split_step = split_operator.as_step(inputs={"inputTable": type_transform_step.outputs[0],
            "execution": execution_input, "output1TableName": gen_temp_table(),
            "fraction": 0.5, "output2TableName": gen_temp_table(),
        })

    # Pipeline构造函数中的steps和inputs信息并不要求完整输入,Pipeline graph时,是通过Pipeline的outputs和steps推导他们的依赖,从而构造对应的执行DAG。
    p = Pipeline(
        steps=[split_step],
        outputs=split_step.outputs[:2],
    )
    return p

p = create_composite_pipeline()
# 输入工作流运行所需参数(arguments)后,提交到PAI Pipeline Service运行。
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
            "execution": maxc_execution,
            "cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
            "input_table": "odps://pai_online_project/tables/wumai_data",
        }, wait=True)


pipeline_run.get_outputs()
对于上述构建好的工作流,您可以为其指定组件名称和版本,将该工作流保存到服务端成为一个可复用组件。保存的组件默认共享给阿里云账号下的所有RAM用户,保存组件的示例代码如下。
# 指定identifier和版本,保存工作流。保存的组件的provider默认为对应的阿里云账号的UID。
p = p.save(identifier="demo-composite-pipeline", version="v1")
print(p.pipeline_id, p.identifier, p.version, p.provider)

心脏病预测案例

以下示例代码通过Python SDK构建了一个心脏病预测工作流,并提交任务使其运行在PAI Pipeline Service。有关心脏病预测案例的详细信息,请参见心脏病预测

您可以在本地的Python环境或PAI-DSW中,完成Session初始化(参见准备工作)后,运行以下的示例代码训练心脏病预测模型。为了保存输出的PMML模型文件,工作流在运行时,需要提供您的OSS信息。
from pai.pipeline import Pipeline, PipelineStep, PipelineParameter
from pai.common.utils import gen_run_node_scoped_placeholder
from pai.pipeline.types import PipelineArtifact, MetadataBuilder

def create_pipeline():
    feature_cols = [
                       "sex",
                       "cp",
                       "fbs",
                       "restecg",
                       "exang",
                       "slop",
                       "thal",
                       "age",
                       "trestbps",
                       "chol",
                       "thalach",
                       "oldpeak",
                       "ca",
        ]
    label_col = "ifhealth"
    full_cols = ",".join(feature_cols + [label_col])

    pmml_oss_bucket = PipelineParameter("pmml_oss_bucket")
    pmml_oss_rolearn = PipelineParameter("pmml_oss_rolearn")
    pmml_oss_path = PipelineParameter("pmml_oss_path")
    pmml_oss_endpoint = PipelineParameter("pmml_oss_endpoint")
    execution = PipelineParameter("execution", typ=dict)
    dataset_input = PipelineArtifact(
        "inputTable",
        metadata=MetadataBuilder.maxc_table(),
        required=True,
    )

    sql = (
        "select age, (case sex when 'male' then 1 else 0 end) as sex,(case cp when"
        " 'angina' then 0  when 'notang' then 1 else 2 end) as cp, trestbps, chol,"
        " (case fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm'"
        " then 0  when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when"
        " 'true' then 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0  "
        "when 'flat' then 1 else 2 end) as slop, ca, (case thal when 'norm' then 0 "
        " when 'fix' then 1 else 2 end) as thal, (case status when 'sick' then 1 else "
        "0 end) as ifHealth from ${t1};"
    )
    sql_step = PipelineStep(
        "sql",
        name="sql-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable1": dataset_input,
            "execution": execution,
            "sql": sql,
            "outputTableName": gen_run_node_scoped_placeholder(
                suffix="outputTable"
            ),
        },
    )

    type_transform_step = PipelineStep(
        "type_transform",
        name="type-transform-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": sql_step.outputs["outputTable"],
            "cols_to_double": full_cols,
            "outputTable": gen_run_node_scoped_placeholder(
                suffix="outputTable"
            ),
        },
    )

    normalize_step = PipelineStep(
        "normalize_1",
        name="normalize-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": type_transform_step.outputs["outputTable"],
            "selectedColNames": full_cols,
            "lifecycle": 1,
            "outputTableName": gen_run_node_scoped_placeholder(
                suffix="outputTable"
            ),
            "outputParaTableName": gen_run_node_scoped_placeholder(
                suffix="outputParaTable"
            ),
        },
    )

    split_step = PipelineStep(
        identifier="split",
        name="split-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable": normalize_step.outputs["outputTable"],
            "execution": execution,
            "fraction": 0.8,
            "output1TableName": gen_run_node_scoped_placeholder(
                suffix="output1Table"
            ),
            "output2TableName": gen_run_node_scoped_placeholder(
                suffix="output2Table"
            ),
        },
    )

    model_name = "test_health_prediction_by_pipeline_%s" % (
        random.randint(0, 999999)
    )

    lr_step = PipelineStep(
        identifier="logisticregression_binary",
        name="logisticregression-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable": split_step.outputs["output1Table"],
            "execution": execution,
            "generatePmml": True,
            "pmmlOssEndpoint": pmml_oss_endpoint,
            "pmmlOssBucket": pmml_oss_bucket,
            "pmmlOssPath": pmml_oss_path,
            "pmmlOverwrite": True,
            "roleArn": pmml_oss_rolearn,
            "regularizedLevel": 1.0,
            "regularizedType": "l2",
            "modelName": model_name,
            "goodValue": 1,
            "featureColNames": ",".join(feature_cols),
            "labelColName": label_col,
        },
    )

    offline_model_pred_step = PipelineStep(
        identifier="Prediction_1",
        name="offlinemodel-pred",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "model": lr_step.outputs["model"],
            "inputTable": split_step.outputs["output2Table"],
            "execution": execution,
            "outputTableName": gen_run_node_scoped_placeholder(
                suffix="outputTable"
            ),
            "featureColNames": ",".join(feature_cols),
            "appendColNames": label_col,
        },
    )

    evaluate_step = PipelineStep(
        identifier="evaluate_1",
        name="evaluate-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": offline_model_pred_step.outputs["outputTable"],
            "outputDetailTableName": gen_run_node_scoped_placeholder(
                suffix="outputDetail"
            ),
            "outputELDetailTableName": gen_run_node_scoped_placeholder(
                suffix="outputELDetail"
            ),
            "outputMetricTableName": gen_run_node_scoped_placeholder(
                suffix="outputMetricDetail"
            ),
            "scoreColName": "prediction_score",
            "labelColName": label_col,
        },
    )

    p = Pipeline(
        steps=[evaluate_step, offline_model_pred_step],
        outputs={
            "pmmlModel": lr_step.outputs["PMMLOutput"],
            "evaluateResult": evaluate_step.outputs["outputMetricTable"],
        },
    )
    return p


p = create_pipeline()

# 输出Pipeline的图片。
p.dot().render()

pmml_oss_endpoint = "{{YourOssBucketEndpoint}}"
pmml_oss_path = "{{PathForThePmmlFile}}"
pmml_oss_bucket = "{{YourOssBucketName}}"

# 如何获取Role Arn,请参见文档https://help.aliyun.com/document_detail/106225.html。
pmml_oss_rolearn = "{{RoleArnForPaiToVisitOssBucket}}"
maxc_execution = {
    "endpoint": "{{MaxComputeProjectEndpoint}}",
    "odpsProject": "{{MaxComputeProjectName}}",
}

run_instance = p.run(
    job_name="test_heart_disease_pred",
    arguments={
        "execution": maxc_execution,
        "pmml_oss_rolearn": pmml_oss_rolearn,
        "pmml_oss_path": pmml_oss_path,
        "pmml_oss_bucket": pmml_oss_bucket,
        "pmml_oss_endpoint": pmml_oss_endpoint,
        "inputTable": "odps://pai_online_project/tables/heart_disease_prediction"
    },
    wait=True,
)
print(run_instance.get_outputs())
上述代码构建的工作流,通过pipeline.dot().render()渲染后,得到的Pipeline DAG如下图所示。心脏病Pipeline DAG