PAI支持通过Designer拖拉拽的形式构建数据处理、数据验证及模型训练的机器学习工作流,也支持使用Python SDK构建相同的工作流,提交到PAI的工作流服务执行。本文介绍如何使用Python SDK在PAI创建和执行一个机器学习工作流。
背景信息
为了获得合适的机器学习模型,将其应用于在线推理或离线推理,通常机器学习工程师需要进行数据提取、数据校验、特征工程、模型训练及模型评估全流程,且以上流程并非只进行一次。为了获得更符合业务需求的模型,机器学习工程师需要尝试不同的特征和参数,或使用增量的数据训练模型,以更好地拟合最新的输入特征。为了复用这些流程,通常的解决方案是先将这些流程以工作流的形式组织起来,再提交至一个自动化工作流服务中运行。
PAI提供机器学习工作流服务,您可以通过Designer拖拉拽的方式或Python SDK的方式构建一个机器学习工作流,提交到PAI的工作流服务运行。
Designer是通过拖拉拽的方式构建的图形化工作流,和通过Python SDK构建的工作流不同,因此不能直接使用本地Python SDK调用Designer上的算法组件。
前提条件
已获取阿里云账号的鉴权AccessKey ID和AccessKey Secret,详情请参见:获取AccessKey。
已创建工作空间,详情请参见:创建工作空间。
已创建OSS Bucket,详情请参见:控制台创建存储空间。
准备工作
首先需要安装PAI Python SDK以运行本示例。
pip install "alipai>=0.4.0"
SDK需要配置访问阿里云服务需要的AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI SDK安装之后,通过在命令行终端中执行以下命令进行配置,详细的安装和配置介绍见文档:安装和配置。
python -m pai.toolkit.config
获取算法组件
Component是PAI工作流中定义的算法组件,包含了组件的输入输出信息、运行参数及具体执行的实现方式(执行一个DAG或执行一个单独的镜像)。您可以通过Python SDK从PAI获取PAI内置的公共算法组件(RegisteredComponent),也可以将本地构造的工作流对象保存为一个Component进行复用(参见下文的构建工作流)。
通过Python SDK从PAI工作流服务获取PAI内置的公共算法组件(RegisteredComponent
)的详细步骤如下。
通过
RegisteredComponent.list
方法获取组件列表。在
RegisteredComponent.list
方法中,PAI提供了一些公共的算法组件,通过指定provider为ProviderAlibabaPAI,即可获取PAI提供的算法组件列表。此外,您可以通过inputs和outputs属性,查看对应组件的输入输出信息,代码示例如下所示。from pai.pipeline.component import RegisteredComponent from pai.common import ProviderAlibabaPAI for item in RegisteredComponent.list(provider=ProviderAlibabaPAI, page_size=50, page_number=1): print(item)
获取指定的算法组件。
通过上一步的
RegisteredComponent.list
方法,可以获取算法组件的identifier-provider-version和pipeline_id信息,您可以使用二者中的任意一个从PAI获取一个唯一的算法组件。这两种方式的区别在于identifier-provider-version是由组件开发者在保存组件时指定的,而pipeline_id是由PAI生成的组件唯一标识ID。以PAI提供的split组件为例,两种方式的代码示例分别如下:通过identifier-provider-version获取对应的组件
component = RegisteredComponent.get_by_identifier(identifier="split", provider=ProviderAlibabaPAI, version="v1") print(component.inputs)
目前PAI提供的组件主要基于MaxCompute算法,底层依赖通过PAI命令实现。这些组件的identifier默认为 PAI命令的name,version为v1。您可以在PAI的常规机器学习组件帮助文档中查看组件的输入参数定义,详情请参见常规机器学习组件。
通过pipeline_id获取对应的组件
component = RegisteredComponent.get(component.id) print(component.identifier, component.provider, component.version) print(component.inputs)
提交运行任务。
通过指定组件的必须输入参数,您可以提交一个单独的组件运行任务,以下以PAI提供的split算法组件为例,介绍如何使用SDK将输入数据表odps://pai_online_project/tables/wumai_data(该输入表为公开表,您可以直接使用)按照给定比例拆分到两张新表中。
# split运行在MaxCompute中,需要指定运行的MaxCompute项目和执行环境。 # maxc_execution作为算法组件的一个输入,标识算法组件的执行MaxCompute Project项目信息。 maxc_execution = { "endpoint": "<YourMaxComputeProjectEndpoint>", "odpsProject": "<yourMaxComputeProjectName>", } #提交运行任务。 pipeline_run = component.run( job_name="example-split-job", arguments={ "execution":maxc_execution, "inputTable": "odps://pai_online_project/tables/wumai_data", "fraction": "0.7", #split算法组件的输入参数,可以通过op.inputs获取输入信息。 } ) print(pipeline_run.get_outputs())
需要根据实际情况,替换以下参数值:
<YourMaxComputeProjectEndpoint>:项目所在地域的Endpoint,详情请参见Endpoint。
<YourMaxComputeProjectName>:MaxCompute项目名称。
上述代码中通过
run
接口提交任务后,SDK会在Console中输出任务在PAI控制台中的URL,您也可以直接在PAI控制台的任务管理页面,通过返回的运行任务ID或任务名称查找对应的任务实例。找到该任务后,进入任务详情页面,您可以查看任务执行的DAG、日志及输出,并且可以将模型直接部署到EAS。组件可以单独运行,也可以将多个组件拼接为一个工作流运行,详情请参见下文的构建工作流。
构建工作流
PAI工作流服务支持将多个算法组件拼接为一个新的工作流,通过提供输入参数可以提交并运行该工作流,或将其保存为一个复合工作流组件。保存的工作流组件可以作为一个普通组件直接运行(参见上文的提交任务)或作为新创建的工作流的一个节点。
创建一个新的工作流主要包括以下流程:
定义工作流的输入信息。
输入信息包括用户输入参数PipelineParameter或数据输入PipelineArtifact。对于数据输入,目前PAI的工作流服务支持OSS数据、MaxCompute表数据及MaxCompute OfflineModel。
创建工作流中的Step及其输入。
Step的输入可能来自于其他Step,也可能来源于当前创建的工作流的输入。
指定工作流的输出信息。
工作流可以使用引用的方式将Step节点的输出作为新的工作流的输出。
下面的示例代码中,构建了一个简单的工作流,它先使用类型转换组件将输入表的部分字段转换为浮点类型,再通过数据拆分组件将表拆分为两张MaxCompute表。
from pai.pipeline.types import PipelineParameter, PipelineArtifact, ArtifactMetadataUtils
from pai.pipeline import PipelineStep, Pipeline, RegisteredComponent
def create_composite_pipeline():
# 定义当前工作流的输入信息。
# 由于组件底层的处理是运行在MaxCompute上,execution参数传递执行的MaxCompute Project信息。
execution_input = PipelineParameter(name="execution", typ=dict)
cols_to_double_input = PipelineParameter(name="cols_to_double")
table_input = PipelineArtifact(name="input_table", metadata=ArtifactMetadataUtils.maxc_table())
# 构建类型转换Step。
# 指定identifier-provider-version, 使用一个已经保存的组件,作为工作流的一个Step。
type_transform_step = PipelineStep.from_registered_component(
identifier="type_transform", provider=ProviderAlibabaPAI,
version="v1", name="typeTransform", inputs={
"inputTable": table_input,
"execution": execution_input,
# "outputTable": gen_temp_table(),
"cols_to_double": cols_to_double_input,
}
)
# 构建拆分表Step。
# SavedOperator也可以作为一个Step构建工作流。
split_operator = RegisteredComponent.get_by_identifier(identifier="split",
provider=ProviderAlibabaPAI, version="v1")
split_step = split_operator.as_step(inputs={"inputTable": type_transform_step.outputs[0],
"execution": execution_input,
# "output1TableName": gen_temp_table(),
"fraction": 0.5,
# "output2TableName": gen_temp_table(),
})
# Pipeline构造函数中的steps和inputs信息并不要求完整输入,Pipeline graph时,是通过Pipeline的outputs和steps推导他们的依赖,从而构造对应的执行DAG。
p = Pipeline(
steps=[split_step],
outputs=split_step.outputs[:2],
)
return p
p = create_composite_pipeline()
# 输入工作流运行所需参数(arguments)后,提交到PAI Pipeline Service运行。
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
"execution": maxc_execution,
"cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
"input_table": "odps://pai_online_project/tables/wumai_data",
}, wait=True)
pipeline_run.get_outputs()
对于上述构建好的工作流,您可以为其指定组件名称和版本,将该工作流保存到服务端成为一个可复用组件。保存的组件默认共享给阿里云账号下的所有RAM用户,保存组件的示例代码如下。
# 指定identifier和版本,保存工作流。保存的组件的provider默认为对应的阿里云账号的UID。
p = p.save(identifier="demo-composite-pipeline", version="v1")
print(p.pipeline_id, p.identifier, p.version, p.provider)
心脏病预测案例
以下示例代码通过Python SDK构建了一个心脏病预测工作流,并提交任务使其运行在PAI工作流服务。有关心脏病预测案例的详细信息,请参见心脏病预测。
您可以在本地的Python环境或DSW中,完成SDK初始化(参见安装和配置)后,运行以下的示例代码训练心脏病预测模型。为了保存输出的PMML模型文件,工作流在运行时,需要提供您的OSS信息。
import random
from pai.pipeline.types import (
ArtifactMetadataUtils,
ParameterType,
PipelineArtifact,
PipelineParameter,
)
from pai.pipeline import Pipeline, PipelineStep
feature_cols = "sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca,ifhealth"
label_col = "ifhealth"
def create_pipeline():
pmml_oss_bucket = PipelineParameter("pmml_oss_bucket")
pmml_oss_rolearn = PipelineParameter("pmml_oss_rolearn")
pmml_oss_path = PipelineParameter("pmml_oss_path")
pmml_oss_endpoint = PipelineParameter("pmml_oss_endpoint")
execution = PipelineParameter("execution", ParameterType.Map)
dataset_input = PipelineArtifact(
"dataset-table",
metadata=ArtifactMetadataUtils.maxc_table(),
required=True,
)
sql = (
"select age, (case sex when 'male' then 1 else 0 end) as sex,(case cp when"
" 'angina' then 0 when 'notang' then 1 else 2 end) as cp, trestbps, chol,"
" (case fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm'"
" then 0 when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when"
" 'true' then 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0 "
"when 'flat' then 1 else 2 end) as slop, ca, (case thal when 'norm' then 0 "
" when 'fix' then 1 else 2 end) as thal, (case status when 'sick' then 1 else "
"0 end) as ifHealth from ${t1};"
)
sql_step = PipelineStep.from_registered_component(
"sql",
name="sql-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable1": dataset_input,
"execution": execution,
"sql": sql,
},
)
type_transform_step = PipelineStep.from_registered_component(
"type_transform",
name="type-transform-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": sql_step.outputs["outputTable"],
"cols_to_double": feature_cols,
# "outputTable": gen_run_node_scoped_placeholder(suffix="outputTable"),
},
)
normalize_step = PipelineStep.from_registered_component(
"normalize_1",
name="normalize-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": type_transform_step.outputs["outputTable"],
"selectedColNames": feature_cols,
"lifecycle": 1,
# "outputTableName": gen_run_node_scoped_placeholder(suffix="outputTable"),
# "outputParaTableName": gen_run_node_scoped_placeholder(
# suffix="outputParaTable"
# ),
},
)
split_step = PipelineStep.from_registered_component(
identifier="split",
name="split-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable": normalize_step.outputs["outputTable"],
"execution": execution,
"fraction": 0.8,
},
)
model_name = "test_health_prediction_by_pipeline_%s" % (random.randint(0, 999999))
lr_step = PipelineStep.from_registered_component(
identifier="logisticregression_binary",
name="logisticregression-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable": split_step.outputs["output1Table"],
"execution": execution,
"generatePmml": True,
"pmmlOssEndpoint": pmml_oss_endpoint,
"pmmlOssBucket": pmml_oss_bucket,
"pmmlOssPath": pmml_oss_path,
"pmmlOverwrite": True,
"roleArn": pmml_oss_rolearn,
"regularizedLevel": 1.0,
"regularizedType": "l2",
"modelName": model_name,
"goodValue": 1,
"featureColNames": ",".join(feature_cols),
"labelColName": label_col,
},
)
offline_model_pred_step = PipelineStep.from_registered_component(
identifier="Prediction_1",
name="offlinemodel-pred",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"model": lr_step.outputs["model"],
"inputTable": split_step.outputs["output2Table"],
"execution": execution,
"featureColNames": feature_cols,
"appendColNames": label_col,
},
)
evaluate_step = PipelineStep.from_registered_component(
identifier="evaluate_1",
name="evaluate-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": offline_model_pred_step.outputs["outputTable"],
"scoreColName": "prediction_score",
"labelColName": label_col,
},
)
p = Pipeline(
steps=[evaluate_step, offline_model_pred_step],
outputs={
"pmmlModel": lr_step.outputs["PMMLOutput"],
"evaluateResult": evaluate_step.outputs["outputMetricTable"],
},
)
return p
p = create_pipeline()
# 输出Pipeline的图片。
p.dot().render()
pmml_oss_endpoint = "<YourOssBucketEndpoint>"
pmml_oss_path = "<PathForThePmmlFile>"
pmml_oss_bucket = "<YourOssBucketName>"
pmml_oss_rolearn = "<RoleArnForPaiToVisitOssBucket>"
maxc_execution = {
"endpoint": "<your_Project_Endpoint>",
"odpsProject": "<your_MaxCompute_project>",
}
run_instance = p.run(
job_name="test_heart_disease_pred",
arguments={
"execution": maxc_execution,
"pmml_oss_rolearn": pmml_oss_rolearn,
"pmml_oss_path": pmml_oss_path,
"pmml_oss_bucket": pmml_oss_bucket,
"pmml_oss_endpoint": pmml_oss_endpoint,
"inputTable": "odps://pai_online_project/tables/heart_disease_prediction"
},
wait=True,
)
print(run_instance.get_outputs())
需要根据实际情况,替换以下参数值。
参数 | 描述 |
<YourOssBucketEndpoint> | OSS Bucket所在地域的Endpoint,详情请参见访问域名和数据中心。 |
<PathForThePmmlFile> | 模型文件保存的OSS Bucket路径。 |
<YourOssBucketName> | OSS Bucket名称。 |
<RoleArnForPaiToVisitOssBucket> | OSS的roleArn。您可以登录PAI控制台,在 页面的Designer区域,单击操作列下的查看授权信息,获取role_Arn,具体操作请参见PAI访问云产品授权:OSS。 |
<your_Project_Endpoint> | 项目所在地域的Endpoint,详情请参见Endpoint。 |
<your_MaxCompute_project> | MaxCompute项目名称。 |
上述代码构建的工作流,通过pipeline.dot().render()
渲染后,得到的Pipeline DAG如下图所示。
- 本页导读 (1)