使用Python开发自定义Processor

您可以使用Python开发自定义Processor文件,结合模型文件在本地调试后分别上传至OSS,在部署服务时挂载使用。本文为您介绍使用Python开发自定义Processor的详细步骤。

背景信息

    说明
    • 建议将模型文件和Processor的实现分开,这样在将来需要微调模型并重新部署服务时,可以重用之前的Processor包。您可以在编写预测逻辑时,使用get_model_path()方法获取模型的存储路径,以便在预测逻辑中加载和使用该模型。

    • 当自定义Processor的依赖较多、Processor包较大时,不建议使用Processor方式来部署服务,建议您使用镜像方式来部署服务,两种部署方式的对比介绍请参见部署原理

使用Python开发自定义Processor的流程如下:

  1. 步骤一:构建开发环境

    EAS提供的Python SDK支持多种Python机器学习框架,并能够集成Pandas等数据分析处理框架。通过本文给出的方式,您可以在本地构建Python开发环境,以便后续开发自定义Processor,并打包上传环境。

  2. 步骤二:编写预测逻辑

    EASPython SDK提供了高性能的RPC框架和与EAS集群交互所需的内部接口。您只需要编写预测逻辑实现简单的接口,就可以将模型服务部署至EAS

  3. 步骤三:本地测试服务

    编写预测逻辑后,您需要进行本地测试服务。本地测试可以帮助您验证预测逻辑的正确性,以确保后续部署服务后,其功能符合要求。

  4. 步骤四:打包代码文件与Python环境

    按照指定要求打包代码,以便于后续部署模型服务。

  5. 步骤五:上传Processor代码包文件和代码文件

    将完整的打包文件和模型文件分别上传至OSS。

  6. 步骤六:部署和测试服务

    将以已构建的自定义的Processor部署为模型服务。

前提条件

已准备好模型文件。

说明

建议将模型文件和自定义Processor分开开发,以便于模型和Processor文件的维护和替换。开发完成后,分别将其上传到OSS,在后续部署服务时进行挂载调用。

步骤一:构建开发环境

您可以使用PyenvPython包管理工具构建开发环境。EAS提供的客户端工具EASCMDPython SDK的初始化过程进行了封装,下载该工具后,您只需要运行一条命令即可完成Python SDK环境初始化,并生成相关的文件模板,仅适用于Linux操作系统。示例命令如下:

# 安装并初始化EASCMD,该示例为安装Linux环境的EASCMD工具。
$ wget https://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd/v2/eascmd64
# 下载完成后,可以修改访问权限,配置阿里云上AccessKey信息。
$ chmod +x eascmd64
$ ./eascmd64 config -i <access_id> -k <access_key>

# 初始化环境。
$ ./eascmd64 pysdk init ./pysdk_demo

在返回结果中输入Python版本(默认为3.6版本)后,系统会自动在您配置的./pysdk_demo目录中创建Python环境ENV目录、预测服务代码模板app.py及服务部署模板app.json

步骤二:编写预测逻辑

ENV同级目录下,创建预测服务主文件app.py,文件内容示例如下:

说明
  • 使用EASCMD构建Python初始化环境时,系统会自动生成app.py文件,无需手动创建。您可以根据需要修改该文件内容。

  • EAS预置的开发镜像中已预先创建了app.py文件,无需手动创建,您可以根据需要修改该文件内容。

# -*- coding: utf-8 -*-
import allspark


class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor is a example
        you can send mesage like this to predict
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ load module, executed once at the start of the service
             do service intialization and load models in this function.
        """
        self.module = {'w0': 100, 'w1': 2}
        # model_dir = self.get_model_path().decode()
        # load_model函数需要您自行实现,若您需要加载model.pt模型文件,则可以实现为torch.load(model_dir + "/model.pt")。
        # self.model = load_model(model_dir)

    def pre_process(self, data):
        """ data format pre process
        """
        x, y = data.split(b' ')
        return int(x), int(y)

    def post_process(self, data):
        """ process after process
        """
        return bytes(data, encoding='utf8')

    def process(self, data):
        """ process the request data
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400


if __name__ == '__main__':
    # allspark.default_properties().put('rpc.keepalive', '10000')
    # 设置服务计算超时时间为10s, 默认为5秒
    # parameter worker_threads indicates concurrency of processing
    runner = MyProcessor(worker_threads=10)
    runner.run()

上述代码即为Python SDK的简单示例,代码中继承了EAS提供的基类BaseProcessor,实现了initialize()process()函数。

函数

功能描述

使用说明

initialize()

Processor初始化函数。服务启动时,进行模型加载等初始化工作。

initialize()函数中添加以下代码,将模型文件的获取与Processor的实现进行分离。

model_dir = self.get_model_path().decode()
self.model = load_model(model_dir)
  • 通过get_model_path()方法获取bytes类型的模型存储目录,即上传的模型文件在服务实例中的实际存储目录。

  • 通过自定义load_model()函数,来加载和使用模型文件以实现服务的部署。若您需要加载model.pt模型文件,则可以实现为torch.load(model_dir + "/model.pt")

get_model_path()

获取bytes类型的模型存储目录。

当您在部署服务的JSON配置文件中通过指定model_path参数来上传模型文件时,可调用get_model_path()方法获取模型文件在服务实例中的实际存储目录,以用于模型加载。

process(data)

请求处理函数。每个请求会将Request Body作为参数传递给process()进行处理,并将函数返回值返回至客户端。

dataRequest Body,类型为BYTES。返回值为BYTESINT类型,输出参数分别为response_datastatus_code,正常请求status_code可以返回0200

_init_(worker_threads=5, worker_processes=1,endpoint=None)

Processor构造函数。

  • worker_threads:Worker线程数,默认值为5。

  • worker_processes:进程数,默认值为1。如果worker_processes1,则表示单进程多线程模式。如果worker_processes大于1,则worker_threads只负责读取数据,请求由多进程并发处理,每个进程均会执行initialize()函数。

  • endpoint:服务监听的Endpoint,通过该参数可以指定服务监听的地址和端口,例如endpoint='0.0.0.0:8079'

    说明

    此处配置的端口需避开EAS的系统监听端口80809090。

run()

启动服务。

无参数。

步骤三:本地测试服务

  1. 在终端app.py文件所在目录执行以下命令,启动Python应用程序。

    ./ENV/bin/python app.py

    系统返回如下结果,表明应用程序启动成功。

    [INFO] waiting for service initialization to complete...
    [INFO] service initialization complete
    [INFO] create service
    [INFO] rpc binds to predefined port 8080
    [INFO] install builtin handler call to /api/builtin/call
    [INFO] install builtin handler eastool to /api/builtin/eastool
    [INFO] install builtin handler monitor to /api/builtin/monitor
    [INFO] install builtin handler ping to /api/builtin/ping
    [INFO] install builtin handler prop to /api/builtin/prop
    [INFO] install builtin handler realtime_metrics to /api/builtin/realtime_metrics
    [INFO] install builtin handler tell to /api/builtin/tell
    [INFO] install builtin handler term to /api/builtin/term
    [INFO] Service start successfully
  2. 重新打开一个终端,执行以下命令测试应用程序的响应情况。

    按照步骤二给出的示例代码,执行以下命令向应用程序发送两个请求数据,返回结果与代码逻辑对应。

    curl http://127.0.0.1:8080/test  -d '10 20'

步骤四:打包代码文件与Python环境

EASCMD为您提供了打包命令,您可以一键完成打包。如果您没有使用EASCMD进行自定义Processor开发,您也可以手动打包完整环境。两种打包方式的介绍如下。

  • 使用EASCMD封装的打包命令(仅Linux系统)。

    $ ./eascmd64 pysdk pack ./pysdk_demo

    命令执行成功后,输出如下结果:

    [PYSDK] Creating package: /home/xi****.lwp/code/test/pysdk_demo.tar.gz
  • 手动打包

    打包要求

    详情

    代码包格式要求

    您需要打包为一个.zip.tar.gz格式的压缩包。

    代码包内容要求

步骤五:上传Processor代码包文件和代码文件

打包完成后,您需要将完整的打包文件(.zip.tar.gz格式)和模型文件分别上传至OSS,便于后续部署服务时挂载使用。上传至OSS的操作请参见命令行工具ossutil命令参考

步骤六:部署和测试服务

您可以通过控制台或EASCMD客户端来部署模型服务。

  1. 部署服务。

    通过控制台方式部署服务

    1. 登录PAI控制台,在页面上方选择目标地域,并在右侧选择目标工作空间,然后单击进入EAS

    2. 单击部署服务,然后在自定义模型部署区域,单击自定义部署

    3. 自定义部署页面配置以下关键参数,更多参数配置详情,请参见服务部署:控制台

      参数

      描述

      部署方式

      选择processor部署

      模型配置

      配置类型选择对象存储(OSS),并选择模型文件所在的OSS路径。

      Processor种类

      选择自定义processor

      Processor语言

      选择python

      Processor

      配置类型选择对象存储(OSS),并选择上述步骤已打包的文件所在的OSS路径。

      Processor主文件

      配置为./app.py

    4. (可选)在服务配置区域添加data_image参数,参数内容配置为上述步骤中打包时配置的镜像路径。

      说明

      如果您在步骤四中通过镜像上传环境,则需要配置data_image参数,否则忽略该操作。

    5. 单击部署

    通过EASCMD方式部署服务

    Linux操作系统为例,说明部署服务的操作步骤:

    1. 下载EASCMD客户端并进行身份认证。具体操作,请参见下载并认证客户端

    2. 在客户端文件所在目录新建JSON格式的文件,命名为app.json。通过EASCMD或手动打包完整环境的文件内容示例如下:

      {
        "name": "pysdk_demo",
        "processor_entry": "./app.py",
        "processor_type": "python",
        "processor_path": "oss://examplebucket/exampledirectory/pysdk_demo.tar.gz",
        "model_path": "oss://examplebucket/exampledirectory/model",
        "cloud": {
              "computing": {
                  "instance_type": "ecs.c7.large"
              }
        },
        "metadata": {
          "instance": 1,
          }
      }
    3. 打开终端工具,在JSON文件所在目录,使用以下命令部署服务。

      $ ./eascmd64 create app.json

      系统返回如下结果,表示服务部署成功。

      [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
      +-------------------+-------------------------------------------------------------------+
      | Intranet Endpoint | http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
      |             Token | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****          |
      +-------------------+-------------------------------------------------------------------+
      [OK] Waiting task server to be ready
      [OK] Fetching processor from [oss://eas-model-beijing/195557026392****/pysdk_demo.tar.gz]
      [OK] Building image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Pushing image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
      [OK] Waiting [Total: 1, Pending: 1, Running: 0]
      [OK] Service is running
  2. 测试服务。

    1. 登录PAI控制台,在页面上方选择目标地域,并在右侧选择目标工作空间,然后单击进入EAS

    2. 单击目标服务的服务方式列下的调用信息,获取调用的公网地址和Token

    3. 在终端中,根据步骤b查询的信息进行调用。

      $ curl <service_url> -H 'Authorization: <token>' -d '10 20'

      其中:

      • <service_url>:替换为步骤b中获取的公网地址。例如:http://182848887922****.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo

      • <token>:替换为步骤b中获取的Token。例如:ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0****

      • -d后的内容配置为服务的调用输入参数。

相关文档