您可以在Designer页面执行PyAlink脚本来构建和训练模型,也可以通过Python SDK调用Designer的PyAlink脚本来构建和训练模型。本文为您介绍如何使用Python SDK调用PyAlink脚本。
准备环境:安装Python SDK
如果您通过本地的Python开发环境使用SDK,则需要安装SDK,安装SDK的命令如下。
如果您通过DSW环境使用SDK,该环境中已经预装了SDK,您可以跳过该步骤。
pip install https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.3.dev2-py2.py3-none-any.whl
其中https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.3.dev2-py2.py3-none-any.whl表示SDK的下载地址。
重点代码块和对应的参数说明
通过Python SDK调用Designer的PyAlink脚本,使用的重点代码块和对应的参数说明如下所示。
初始化默认的SDK Session和工作空间。
setup_default_session( access_key_id="**", access_key_secret="**", region_id="cn-hangzhou", workspace_id='41452')
参数配置详情如下所示。
参数
描述
access_key_id
阿里云账号的AccessKey ID。
access_key_secret
阿里云账号的AccessKey Secret。
region_id
地域ID。
workspace_id
工作空间ID。
通过以下代码获取PyAlink脚本组件。
pyalink_op = SavedOperator.get_by_identifier( identifier="alink_pyalink", provider=ProviderAlibabaPAI, version="v1")
准备PyAlink脚本。
pyalink_script = """ import itertools from pyalink.alink import * def main(sources, sinks, parameter): raw_data = sources[0] # split the raw dataset spliter = SplitBatchOp().setFraction(0.8).linkFrom(raw_data) train_data = spliter valid_data = spliter.getSideOutput(0) train_data.link(sinks[0]) valid_data.link(sinks[1]) BatchOperator.execute() """
其中:
sources:List[BatchOperator]类型,对应输入的BatchOp列表;sources[0] 对应列表中第一个元素,sources[1] 对应列表中第二个元素,依次类推。
sinks:List[BatchOperator]类型,对应输出的BatchOp列表;sinks[0] 对应列表中第一个元素,sinks[1]对应列表中第二个元素,依次类推。
parameter:str类型,对应自定义参数输入框中的字符串。
PyAlink脚本启动的Alink算法作业支持以下三种运行模式。
MaxCompute分布式运行。
pyalink_op.run(job_name='pyalink-demo', arguments={ 'computeTarget': 'MaxCompute', 'execution': { 'resourceType': 'MaxCompute', 'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api', 'odpsProject': 'abk_test_1', }, 'execution_maxcompute': { 'resourceType': 'MaxCompute', 'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api', 'odpsProject': 'abk_test_1', }, 'literalPythonScript': pyalink_script, 'input-0': 'odps://pai_online_project/tables/heart_disease_prediction' })
DLC单机多并发运行。
pyalink_op.run(job_name='pyalink-demo', arguments={ 'computeTarget': 'DLC', 'execution': { 'resourceType': 'DLC', }, 'execution_maxcompute': { 'resourceType': 'MaxCompute', 'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api', 'odpsProject': 'abk_test_1', }, 'literalPythonScript': pyalink_script, 'input-0': 'odps://pai_online_project/tables/heart_disease_prediction' })
Flink全托管分布式运行。
pyalink_op.run(job_name='pyalink-demo', arguments={ 'computeTarget': 'Flink', 'execution': { 'resourceType': 'Flink', "resourceId": "Flink全托管的工作空间ID", "namespace": "Flink全托管的项目空间名字", }, 'execution_maxcompute': { 'resourceType': 'MaxCompute', 'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api', 'odpsProject': 'abk_test_1', }, 'literalPythonScript': pyalink_script, 'input-0': 'odps://pai_online_project/tables/heart_disease_prediction' })
参数说明如下:
job_name:启动的作业名称。
arguments:启动的作业使用的参数,PyAlink脚本支持使用以下参数:
参数
描述
computeTarget
运行资源的类型,包括DLC、MaxCompute、Flink。
execution
Dict类型,启动作业的运行资源。
execution_maxcompute
Dict类型,MaxCompute资源,用于读写MaxCompute表。
literalPythonScript
PyAlink脚本的内容。
customParameter
str类型,对应PyAlink脚本中main函数的第三个参数parameter。
customFlinkConfig:
str类型,自定义Flink的配置项。
lifecycle
str类型,输出的MaxCompute表的生命周期,默认为28天。
input-n
第n个输入数据,最多支持4个输入,支持以下类型:
MaxCompute表:格式为
odps://[MaxCompute项目名]/tables/[MaxCompute表名]
。OSS中的CSV文件:格式为
oss://[oss-bucket].[endpoint]/[oss-file-path]
。
output-n
第n个输出数据,最多支持4个输出,支持MaxCompute表和OSS文件。当不指定该参数时,会自动在当前MaxCompute项目创建一张数据表。
调用代码示例
通过Python SDK调用Designer的PyAlink脚本的代码示例如下所示。
from __future__ import print_function
from pai.core.session import setup_default_session
from pai.operator import SavedOperator
from pai.common import ProviderAlibabaPAI
def main():
setup_default_session(
# 您的AccessKey ID。
access_key_id="**",
# 您的AccessKey Seret。
access_key_secret="**",
region_id="cn-hangzhou",
workspace_id='41452')
op = SavedOperator.get_by_identifier(
identifier="alink_pyalink", provider=ProviderAlibabaPAI, version="v1")
compute_resource = {
'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
'odpsProject': 'abk_test_1',
}
pyalink_script = """
import itertools
from pyalink.alink import *
def main(sources, sinks, parameter):
raw_data = sources[0]
# split the raw dataset
spliter = SplitBatchOp().setFraction(0.8).linkFrom(raw_data)
train_data = spliter
valid_data = spliter.getSideOutput(0)
feature_cols = 'age,trestbps,chol,thalach,oldpeak,ca,sex,cp,fbs,restecg,exang,slop,thal'.split(',')
label_col = 'if_health'
type_convert_sql = ','.join([f'cast({x} as double) as {x}' for x in
itertools.chain(feature_cols, [label_col])])
model = Pipeline(
Select(clause='''
age,
(case sex when 'male' then 1.0 else 0.0 end) as sex,
(case cp when 'angina' then 0.0 when 'notang' then 1.0 else 2.0 end) as cp,
trestbps,
chol,
(case fbs when 'true' then 1.0 else 0.0 end) as fbs,
(case restecg when 'norm' then 0.0 when 'abn' then 1.0 else 2.0 end) as restecg,
thalach,
(case exang when 'true' then 1.0 else 0.0 end) as exang,
oldpeak,
(case slop when 'up' then 0.0 when 'flat' then 1.0 else 2.0 end) as slop,
ca,
(case thal when 'norm' then 0.0 when 'fix' then 1.0 else 2.0 end) as thal,
(case status when 'sick' then 1 else 0 end) as if_health
'''),
Select(clause=type_convert_sql),
MinMaxScaler().setSelectedCols(feature_cols),
LogisticRegression().setFeatureCols(feature_cols)
.setLabelCol(label_col)
.setPredictionCol('predict_result')
.setPredictionDetailCol('prediction_detail')
).fit(train_data)
predict_result = model.transform(valid_data)
# evaluatation
evaluation = EvalBinaryClassBatchOp().setLabelCol(label_col) \
.setPredictionDetailCol('prediction_detail') \
.linkFrom(predict_result)
model.save().link(sinks[0])
predict_result.link(sinks[1])
evaluation.link(sinks[2])
# export model to OSS
model.save().link(AkSinkBatchOp() \
.setFilePath('oss://bucketname/alink_model.ak') \
.setOverwriteSink(True))
BatchOperator.execute()
"""
op.run(
job_name='pyalink-demo',
arguments={
'computeTarget': 'MaxCompute',
'execution': compute_resource,
'execution_maxcompute': compute_resource,
'input-0': 'odps://pai_online_project/tables/heart_disease_prediction',
'literalPythonScript': pyalink_script
})
pass
if __name__ == '__main__':
main()
- 本页导读 (1)