文档

通过Python SDK调用Designer的PyAlink脚本

更新时间:

您可以在Designer页面执行PyAlink脚本来构建和训练模型,也可以通过Python SDK调用Designer的PyAlink脚本来构建和训练模型。本文为您介绍如何使用Python SDK调用PyAlink脚本。

准备环境:安装Python SDK

如果您通过本地的Python开发环境使用SDK,则需要安装SDK,安装SDK的命令如下。

说明

如果您通过DSW环境使用SDK,该环境中已经预装了SDK,您可以跳过该步骤。

pip install https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.3.dev2-py2.py3-none-any.whl

其中https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.3.dev2-py2.py3-none-any.whl表示SDK的下载地址。

重点代码块和对应的参数说明

通过Python SDK调用Designer的PyAlink脚本,使用的重点代码块和对应的参数说明如下所示。

  • 初始化默认的SDK Session和工作空间。

    setup_default_session(
            access_key_id="**",
            access_key_secret="**",
            region_id="cn-hangzhou",
            workspace_id='41452')

    参数配置详情如下所示。

    参数

    描述

    access_key_id

    阿里云账号的AccessKey ID。

    access_key_secret

    阿里云账号的AccessKey Secret。

    region_id

    地域ID。

    workspace_id

    工作空间ID。

  • 通过以下代码获取PyAlink脚本组件。

    pyalink_op = SavedOperator.get_by_identifier(
            identifier="alink_pyalink", provider=ProviderAlibabaPAI, version="v1")
  • 准备PyAlink脚本。

    pyalink_script = """
    import itertools
    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        raw_data = sources[0]
        # split the raw dataset
        spliter = SplitBatchOp().setFraction(0.8).linkFrom(raw_data)
        train_data = spliter
        valid_data = spliter.getSideOutput(0)
    
        train_data.link(sinks[0])
        valid_data.link(sinks[1])
    
        BatchOperator.execute()
    """

    其中:

    • sources:List[BatchOperator]类型,对应输入的BatchOp列表;sources[0] 对应列表中第一个元素,sources[1] 对应列表中第二个元素,依次类推。

    • sinks:List[BatchOperator]类型,对应输出的BatchOp列表;sinks[0] 对应列表中第一个元素,sinks[1]对应列表中第二个元素,依次类推。

    • parameter:str类型,对应自定义参数输入框中的字符串。

  • PyAlink脚本启动的Alink算法作业支持以下三种运行模式。

    • MaxCompute分布式运行。

      pyalink_op.run(job_name='pyalink-demo', arguments={
          'computeTarget': 'MaxCompute',
          'execution': {
              'resourceType': 'MaxCompute',
              'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
              'odpsProject': 'abk_test_1',
          },
          'execution_maxcompute': {
              'resourceType': 'MaxCompute',
              'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
              'odpsProject': 'abk_test_1',
          },
          'literalPythonScript': pyalink_script,
          'input-0': 'odps://pai_online_project/tables/heart_disease_prediction'
      })
                                          
    • DLC单机多并发运行。

      pyalink_op.run(job_name='pyalink-demo', arguments={
          'computeTarget': 'DLC',
          'execution': {
              'resourceType': 'DLC',
          },
          'execution_maxcompute': {
              'resourceType': 'MaxCompute',
              'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
              'odpsProject': 'abk_test_1',
          },
          'literalPythonScript': pyalink_script,
          'input-0': 'odps://pai_online_project/tables/heart_disease_prediction'
      })
                                          
    • Flink全托管分布式运行。

      pyalink_op.run(job_name='pyalink-demo', arguments={
          'computeTarget': 'Flink',
          'execution': {
              'resourceType': 'Flink',
              "resourceId": "Flink全托管的工作空间ID",
              "namespace": "Flink全托管的项目空间名字",
          },
          'execution_maxcompute': {
              'resourceType': 'MaxCompute',
              'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
              'odpsProject': 'abk_test_1',
          },
          'literalPythonScript': pyalink_script,
          'input-0': 'odps://pai_online_project/tables/heart_disease_prediction'
      })
                                          

    参数说明如下:

    • job_name:启动的作业名称。

    • arguments:启动的作业使用的参数,PyAlink脚本支持使用以下参数:

      参数

      描述

      computeTarget

      运行资源的类型,包括DLC、MaxCompute、Flink。

      execution

      Dict类型,启动作业的运行资源。

      execution_maxcompute

      Dict类型,MaxCompute资源,用于读写MaxCompute表。

      literalPythonScript

      PyAlink脚本的内容。

      customParameter

      str类型,对应PyAlink脚本中main函数的第三个参数parameter

      customFlinkConfig:

      str类型,自定义Flink的配置项。

      lifecycle

      str类型,输出的MaxCompute表的生命周期,默认为28天。

      input-n

      第n个输入数据,最多支持4个输入,支持以下类型:

      • MaxCompute表:格式为odps://[MaxCompute项目名]/tables/[MaxCompute表名]

      • OSS中的CSV文件:格式为oss://[oss-bucket].[endpoint]/[oss-file-path]

      output-n

      第n个输出数据,最多支持4个输出,支持MaxCompute表和OSS文件。当不指定该参数时,会自动在当前MaxCompute项目创建一张数据表。

调用代码示例

通过Python SDK调用Designer的PyAlink脚本的代码示例如下所示。

from __future__ import print_function

from pai.core.session import setup_default_session
from pai.operator import SavedOperator
from pai.common import ProviderAlibabaPAI


def main():
    setup_default_session(
        # 您的AccessKey ID。
        access_key_id="**",
        # 您的AccessKey Seret。
        access_key_secret="**",
        region_id="cn-hangzhou",
        workspace_id='41452')

    op = SavedOperator.get_by_identifier(
        identifier="alink_pyalink", provider=ProviderAlibabaPAI, version="v1")

    compute_resource = {
        'endpoint': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
        'odpsProject': 'abk_test_1',
    }
    pyalink_script = """
import itertools
from pyalink.alink import *

def main(sources, sinks, parameter):
    raw_data = sources[0]
    # split the raw dataset
    spliter = SplitBatchOp().setFraction(0.8).linkFrom(raw_data)
    train_data = spliter
    valid_data = spliter.getSideOutput(0)

    feature_cols = 'age,trestbps,chol,thalach,oldpeak,ca,sex,cp,fbs,restecg,exang,slop,thal'.split(',')
    label_col = 'if_health'

    type_convert_sql = ','.join([f'cast({x} as double) as {x}' for x in
                                 itertools.chain(feature_cols, [label_col])])

    model = Pipeline(
        Select(clause='''
        age,
        (case sex when 'male' then 1.0 else 0.0 end) as sex,
        (case cp when 'angina' then 0.0 when 'notang' then 1.0 else 2.0 end) as cp,
        trestbps,
        chol,
        (case fbs when 'true' then 1.0 else 0.0 end) as fbs,
        (case restecg when 'norm' then 0.0 when 'abn' then 1.0 else 2.0 end) as restecg,
        thalach,
        (case exang when 'true' then 1.0 else 0.0 end) as exang,
        oldpeak,
        (case slop when 'up' then 0.0  when 'flat' then 1.0 else 2.0 end) as slop,
        ca,
        (case thal when 'norm' then 0.0  when 'fix' then 1.0 else 2.0 end) as thal,
        (case status  when 'sick' then 1 else 0 end) as if_health
        '''),
        Select(clause=type_convert_sql),
        MinMaxScaler().setSelectedCols(feature_cols),
        LogisticRegression().setFeatureCols(feature_cols)
        .setLabelCol(label_col)
        .setPredictionCol('predict_result')
        .setPredictionDetailCol('prediction_detail')
    ).fit(train_data)

    predict_result = model.transform(valid_data)
    # evaluatation
    evaluation = EvalBinaryClassBatchOp().setLabelCol(label_col) \
                                         .setPredictionDetailCol('prediction_detail') \
                                         .linkFrom(predict_result)

    model.save().link(sinks[0])
    predict_result.link(sinks[1])
    evaluation.link(sinks[2])

    # export model to OSS
    model.save().link(AkSinkBatchOp() \
            .setFilePath('oss://bucketname/alink_model.ak') \
            .setOverwriteSink(True))

    BatchOperator.execute()
"""

    op.run(
        job_name='pyalink-demo',
        arguments={
            'computeTarget': 'MaxCompute',
            'execution': compute_resource,
            'execution_maxcompute': compute_resource,
            'input-0': 'odps://pai_online_project/tables/heart_disease_prediction',
            'literalPythonScript': pyalink_script
        })
    pass


if __name__ == '__main__':
    main()
  • 本页导读 (1)
文档反馈