PAI不仅支持通过可视化建模(PAI-Designer)拖拉拽的形式构建数据处理、数据验证及模型训练的机器学习工作流,而且支持使用Python SDK构建相同的工作流,并在PAI Pipeline Service中运行。本文介绍如何使用Python SDK构建一个机器学习工作流,并使其运行在PAI Pipeline Service中。
背景信息
为了获得合适的机器学习模型,将其应用于在线推理或离线推理,通常机器学习工程师需要进行数据提取、数据校验、特征工程、模型训练及模型评估全流程,且以上流程并非只进行一次。为了获得更符合业务需求的模型,机器学习工程师需要尝试不同的特征和参数,或使用增量的数据训练模型,以更好地拟合最新的输入特征。为了复用这些流程,通常的解决方案是先将这些流程以工作流(Pipeline)的形式组织起来,再提交至一个自动化工作流服务中运行。
准备工作
获取算法组件
Operator是PAI Pipeline Service中定义的算法组件,包含了组件的输入输出信息、运行参数及具体执行的实现方式(执行一个DAG或执行一个单独的镜像)。您可以通过Python SDK从PAI Pipeline Service获取PAI内置的公共算法组件(SavedOperator),也可以将本地构造的工作流对象保存为一个Operator进行复用(参见下文的构建工作流)。
通过Python SDK从PAI Pipeline Service获取PAI内置的公共算法组件(SavedOperator)的详细步骤如下。
构建工作流
PAI Pipeline Service支持将多个算法组件拼接为一个新的工作流,通过提供输入参数可以提交并运行该工作流,或将其保存为一个复合工作流组件。保存的工作流组件可以作为一个普通组件直接运行(参见上文的提交任务)或作为新创建的工作流的一个节点。
- 定义工作流的输入信息。
输入信息包括用户输入参数PipelineParameter或数据输入PipelineArtifact。对于数据输入,目前PAI Pipeline Service支持OSS数据、MaxCompute表数据及MaxCompute OfflineModel。
- 创建工作流中的Step及其输入。
Step的输入可能来自于其他Step,也可能来源于当前创建的工作流的输入。
- 指定工作流的输出信息。
工作流可以使用引用的方式将Step节点的输出作为新的工作流的输出。
from pai.pipeline.types import PipelineParameter, PipelineArtifact, MetadataBuilder
from pai.pipeline import PipelineStep, Pipeline
from pai.operator import SavedOperator
from pai.common.utils import gen_temp_table
def create_composite_pipeline():
# 定义当前工作流的输入信息。
# 由于组件底层的处理是运行在MaxCompute上,execution参数传递执行的MaxCompute Project信息。
execution_input = PipelineParameter(name="execution", typ=dict)
cols_to_double_input = PipelineParameter(name="cols_to_double")
table_input = PipelineArtifact(name="input_table", metadata=MetadataBuilder.maxc_table())
# 构建类型转换Step。
# 指定identifier-provider-version, 使用一个已经保存的组件,作为工作流的一个Step。
type_transform_step = PipelineStep(
identifier="type_transform", provider=ProviderAlibabaPAI,
version="v1", name="typeTransform", inputs={
"inputTable": table_input, "execution": execution_input,
"outputTable": gen_temp_table(), "cols_to_double": cols_to_double_input,
}
)
# 构建拆分表Step。
# SavedOperator也可以作为一个Step构建工作流。
split_operator = SavedOperator.get_by_identifier(identifier="split",
provider=ProviderAlibabaPAI, version="v1")
split_step = split_operator.as_step(inputs={"inputTable": type_transform_step.outputs[0],
"execution": execution_input, "output1TableName": gen_temp_table(),
"fraction": 0.5, "output2TableName": gen_temp_table(),
})
# Pipeline构造函数中的steps和inputs信息并不要求完整输入,Pipeline graph时,是通过Pipeline的outputs和steps推导他们的依赖,从而构造对应的执行DAG。
p = Pipeline(
steps=[split_step],
outputs=split_step.outputs[:2],
)
return p
p = create_composite_pipeline()
# 输入工作流运行所需参数(arguments)后,提交到PAI Pipeline Service运行。
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
"execution": maxc_execution,
"cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
"input_table": "odps://pai_online_project/tables/wumai_data",
}, wait=True)
pipeline_run.get_outputs()
# 指定identifier和版本,保存工作流。保存的组件的provider默认为对应的阿里云账号的UID。
p = p.save(identifier="demo-composite-pipeline", version="v1")
print(p.pipeline_id, p.identifier, p.version, p.provider)
心脏病预测案例
以下示例代码通过Python SDK构建了一个心脏病预测工作流,并提交任务使其运行在PAI Pipeline Service。有关心脏病预测案例的详细信息,请参见心脏病预测。
from pai.pipeline import Pipeline, PipelineStep, PipelineParameter
from pai.common.utils import gen_run_node_scoped_placeholder
from pai.pipeline.types import PipelineArtifact, MetadataBuilder
from pai.common import ProviderAlibabaPAI
import random
def create_pipeline():
feature_cols = [
"sex",
"cp",
"fbs",
"restecg",
"exang",
"slop",
"thal",
"age",
"trestbps",
"chol",
"thalach",
"oldpeak",
"ca",
]
label_col = "ifhealth"
full_cols = ",".join(feature_cols + [label_col])
pmml_oss_bucket = PipelineParameter("pmml_oss_bucket")
pmml_oss_rolearn = PipelineParameter("pmml_oss_rolearn")
pmml_oss_path = PipelineParameter("pmml_oss_path")
pmml_oss_endpoint = PipelineParameter("pmml_oss_endpoint")
execution = PipelineParameter("execution", typ=dict)
dataset_input = PipelineArtifact(
"inputTable",
metadata=MetadataBuilder.maxc_table(),
required=True,
)
sql = (
"select age, (case sex when 'male' then 1 else 0 end) as sex,(case cp when"
" 'angina' then 0 when 'notang' then 1 else 2 end) as cp, trestbps, chol,"
" (case fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm'"
" then 0 when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when"
" 'true' then 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0 "
"when 'flat' then 1 else 2 end) as slop, ca, (case thal when 'norm' then 0 "
" when 'fix' then 1 else 2 end) as thal, (case status when 'sick' then 1 else "
"0 end) as ifHealth from ${t1};"
)
sql_step = PipelineStep(
"sql",
name="sql-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable1": dataset_input,
"execution": execution,
"sql": sql,
"outputTableName": gen_run_node_scoped_placeholder(
suffix="outputTable"
),
},
)
type_transform_step = PipelineStep(
"type_transform",
name="type-transform-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": sql_step.outputs["outputTable"],
"cols_to_double": full_cols,
"outputTable": gen_run_node_scoped_placeholder(
suffix="outputTable"
),
},
)
normalize_step = PipelineStep(
"normalize_1",
name="normalize-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": type_transform_step.outputs["outputTable"],
"selectedColNames": full_cols,
"lifecycle": 1,
"outputTableName": gen_run_node_scoped_placeholder(
suffix="outputTable"
),
"outputParaTableName": gen_run_node_scoped_placeholder(
suffix="outputParaTable"
),
},
)
split_step = PipelineStep(
identifier="split",
name="split-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable": normalize_step.outputs["outputTable"],
"execution": execution,
"fraction": 0.8,
"output1TableName": gen_run_node_scoped_placeholder(
suffix="output1Table"
),
"output2TableName": gen_run_node_scoped_placeholder(
suffix="output2Table"
),
},
)
model_name = "test_health_prediction_by_pipeline_%s" % (
random.randint(0, 999999)
)
lr_step = PipelineStep(
identifier="logisticregression_binary",
name="logisticregression-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"inputTable": split_step.outputs["output1Table"],
"execution": execution,
"generatePmml": True,
"pmmlOssEndpoint": pmml_oss_endpoint,
"pmmlOssBucket": pmml_oss_bucket,
"pmmlOssPath": pmml_oss_path,
"pmmlOverwrite": True,
"roleArn": pmml_oss_rolearn,
"regularizedLevel": 1.0,
"regularizedType": "l2",
"modelName": model_name,
"goodValue": 1,
"featureColNames": ",".join(feature_cols),
"labelColName": label_col,
},
)
offline_model_pred_step = PipelineStep(
identifier="Prediction_1",
name="offlinemodel-pred",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"model": lr_step.outputs["model"],
"inputTable": split_step.outputs["output2Table"],
"execution": execution,
"outputTableName": gen_run_node_scoped_placeholder(
suffix="outputTable"
),
"featureColNames": ",".join(feature_cols),
"appendColNames": label_col,
},
)
evaluate_step = PipelineStep(
identifier="evaluate_1",
name="evaluate-1",
provider=ProviderAlibabaPAI,
version="v1",
inputs={
"execution": execution,
"inputTable": offline_model_pred_step.outputs["outputTable"],
"outputDetailTableName": gen_run_node_scoped_placeholder(
suffix="outputDetail"
),
"outputELDetailTableName": gen_run_node_scoped_placeholder(
suffix="outputELDetail"
),
"outputMetricTableName": gen_run_node_scoped_placeholder(
suffix="outputMetricDetail"
),
"scoreColName": "prediction_score",
"labelColName": label_col,
},
)
p = Pipeline(
steps=[evaluate_step, offline_model_pred_step],
outputs={
"pmmlModel": lr_step.outputs["PMMLOutput"],
"evaluateResult": evaluate_step.outputs["outputMetricTable"],
},
)
return p
p = create_pipeline()
# 输出Pipeline的图片。
p.dot().render()
pmml_oss_endpoint = "<YourOssBucketEndpoint>"
pmml_oss_path = "<PathForThePmmlFile>"
pmml_oss_bucket = "<YourOssBucketName>"
pmml_oss_rolearn = "<RoleArnForPaiToVisitOssBucket>"
maxc_execution = {
"endpoint": "<your_Project_Endpoint>",
"odpsProject": "<your_MaxCompute_project>",
}
run_instance = p.run(
job_name="test_heart_disease_pred",
arguments={
"execution": maxc_execution,
"pmml_oss_rolearn": pmml_oss_rolearn,
"pmml_oss_path": pmml_oss_path,
"pmml_oss_bucket": pmml_oss_bucket,
"pmml_oss_endpoint": pmml_oss_endpoint,
"inputTable": "odps://pai_online_project/tables/heart_disease_prediction"
},
wait=True,
)
print(run_instance.get_outputs())
需要根据实际情况,替换以下参数值。参数 | 描述 |
---|---|
<YourOssBucketEndpoint> | OSS Bucket所在地域的Endpoint,详情请参见访问域名和数据中心。 |
<PathForThePmmlFile> | 模型文件保存的OSS Bucket路径。 |
<YourOssBucketName> | OSS Bucket名称。 |
<RoleArnForPaiToVisitOssBucket> | OSS的roleArn。您可以登录PAI控制台,在 页面的Designer区域,单击操作列下的查看授权信息,获取role_Arn,具体操作请参见PAI访问云产品授权:OSS。 |
<your_Project_Endpoint> | 项目所在地域的Endpoint,详情请参见Endpoint。 |
<your_MaxCompute_project> | MaxCompute项目名称。 |
pipeline.dot().render()
渲染后,得到的Pipeline DAG如下图所示。