文档

使用PAI Python SDK训练和部署PyTorch模型

更新时间:

PAI Python SDK是PAI提供的Python SDK,提供了更易用的HighLevel API,支持用户在PAI完成模型的训练和部署。本文档介绍如何使用PAI Python SDK在PAI完成一个PyTorch模型的训练和部署。

背景信息

PyTorch是一个非常流行的深度学习框架,提供了极高的灵活性和优越的性能,能够与Python丰富的生态无缝结合,被广泛应用于图像分类、语音识别、自然语言处理、推荐、AIGC等领域。本示例中,我们将使用PAI Python SDK,在PAI完成一个PyTorch模型的训练,然后使用训练获得的模型部署推理服务。主要流程包括:

  1. 安装和配置SDK

安装PAI Python SDK,并配置访问密钥AccessKey,使用的工作空间,以及OSS Bucket。

  1. 准备训练数据

我们下载一个MNIST数据集,上传到OSS上供训练作业使用。

  1. 准备训练脚本

我们使用PyTorch示例仓库中的MNIST训练脚本作为模板,在简单修改之后作为训练脚本。

  1. 提交训练作业

使用PAI Python SDK提供的Estimator API,创建一个训练作业,提交到云上执行。

  1. 部署推理服务

将以上训练作业输出的模型,分别使用Processor和镜像部署的方式部署到EAS,创建在线推理服务。

前提条件

安装和配置SDK

需要首先安装PAI Python SDK以运行本示例。

python -m pip install "alipai>=0.4.0"

在PAI SDK安装之后,通过在命令行终端中执行以下命令进行配置,详细的安装和配置介绍见文档:安装和配置

python -m pai.toolkit.config

准备训练数据

当前示例中,将使用MNIST数据集训练一个图片分类模型。当用户使用云上的训练作业时,需要准备数据,上传到OSS Bucket上。

  • 下载MNIST数据集

使用以下的Shell脚本,将MNIST数据集下载到本地目录data

#!/bin/sh
set -e

url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
# 如果以上的地址下载速度较慢,可以使用以下地址
# url_prefix="http://yann.lecun.com/exdb/mnist/"

mkdir -p data/MNIST/raw/

wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
  • 上传数据集到OSS

用户可以使用OSS提供的命令行工具ossutil上传相应的文件(ossutil的安装和使用请见文档:ossutil概述),或是PAI Python SDK里提供的便利方法,将本地训练数据上传到OSS Bucket的/mnist/data/路径下。

  • 通过ossutil上传:

ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
  • 使用PAI Python SDK上传文件:

from pai.common.oss_utils import upload
from pai.session import get_default_session

sess = get_default_session()
data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
print(data_uri)

准备训练脚本

在提交训练作业之前,需要通过PyTorch编写训练脚本。这里我们以PyTorch官方提供的MNIST示例为基础,在修改了数据加载和模型保存的逻辑之后,作为训练脚本。

  • 使用环境变量获得输入数据路径

当我们通过estimator.fit(inputs={"train_data":data_uri})传递以上的OSS数据URI,相应的数据会被挂载到训练容器中,训练脚本可以通过读取本地文件的方式,读取到挂载的数据。

对于训练作业,estimator.fit方法的inputs是字典,对应的每一个输入数据都是一个Channel,Key是Channel名,Value是数据存储路径,训练作业脚本可以通过PAI_INPUT_{ChannelNameUpperCase}环境变量获取到输入数据挂载到工作容器内的数据路径。

数据加载部分的代码修改如下:

- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
- dataset2 = datasets.MNIST("../data", train=False, transform=transform)

+ # 通过环境变量获得输入数据路径
+ data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
+ dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
+ dataset2 = datasets.MNIST(data_path, train=False, transform=transform)

  • 使用环境变量获取模型的保存路径:

用户需要保存模型到训练环境中的指定路径,对应路径下的数据和模型会被保存到用户的OSS Bucket。默认要求用户将模型保存到环境变量PAI_OUTPUT_MODEL指定的路径下(默认为/ml/output/model)。

模型保存部分的修改代码如下:

- if args.save_model:
-     torch.save(model.state_dict(), "mnist_cnn.pt")
+ # 保存模型
+ save_model(model)
+ 
+ def save_model(model):
+     """将模型转为TorchScript,保存到指定路径."""
+     output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
+     os.makedirs(output_model_path, exist_ok=True)
+     
+     m = torch.jit.script(model)
+     m.save(os.path.join(output_model_path, "mnist_cnn.pt"))

PAI提供的预置PyTorch Processor在创建服务时,要求输入的模型是TorchScript格式。在当前示例中,我们将模型导出为TorchScript格式。

完整的作业脚本如下:

# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            if args.dry_run:
                break


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )


def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=14,
        metavar="N",
        help="number of epochs to train (default: 14)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=1.0,
        metavar="LR",
        help="learning rate (default: 1.0)",
    )
    parser.add_argument(
        "--gamma",
        type=float,
        default=0.7,
        metavar="M",
        help="Learning rate step gamma (default: 0.7)",
    )
    parser.add_argument(
        "--no-cuda", action="store_true", default=False, help="disables CUDA training"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        default=False,
        help="quickly check a single pass",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--save-model",
        action="store_true",
        default=False,
        help="For Saving the current Model",
    )
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    train_kwargs = {"batch_size": args.batch_size}
    test_kwargs = {"batch_size": args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    data_path = os.environ.get("PAI_INPUT_DATA")
    dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    # 保存模型
    save_model(model)


def save_model(model):
    """将模型转为TorchScript,保存到指定路径."""
    output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
    os.makedirs(output_model_path, exist_ok=True)

    m = torch.jit.script(model)
    m.save(os.path.join(output_model_path, "mnist_cnn.pt"))


if __name__ == "__main__":
    main()

我们需要将以上的训练代码保存到一个本地目录下,后续使用Estimator提交到PAI上执行。当前示例中,我们将新建一个train_src目录,将训练脚本保存到 train_src/train.py

|-- train_src                       # 待上传的训练脚本目录
    |-- requirements.txt            # 可选:训练作业的第三方包依赖
    `-- train.py                    # 保存的训练作业脚本

提交训练作业

Estimator支持用户使用本地的训练脚本,以指定的镜像在云上执行训练作业。

  • 训练作业脚本和命令

用户训练作业脚本所在目录(参数source_dir)会被上传到OSS,在作业启动之前准备到作业容器中,默认为/ml/usercode目录。用户指定的启动命令(command参数)的工作目录同样是/ml/usercode

  • 训练作业镜像

当前示例中,我们使用PAI提供的PyTorch镜像运行训练作业。

  • 训练作业超参

用户可以通过读取${PAI_CONFIG_DIR}/hyperparameters.json文件获取到训练作业的超参 ,也可以通过环境变量获取到训练作业超参,详细可见文档:训练作业预置环境变量

在当前示例中,执行的命令是python train.py $PAI_USER_ARGS,其中PAI_USER_ARGS环境变量是作业超参以命令行参数的方式拼接获得的字符串。训练作业最终的启动命令是python train.py --epochs 5 --batch-size 256 --lr 0.5

  • 通过metric_definitions指定需要采集的Metrics

PAI的训练服务支持从训练作业输出日志中(训练脚本打印的标准输出和标准错误输出),以正则表达式匹配的方式捕获训练作业Metrics信息。通过SDK打印的作业的详情页链接,用户查看作业的详情配置、输出日志以及训练作业的Metrics。

  • 通过instance_type指定作业使用的机器实例类型

PAI的训练作业支持的机器实例类型,请见文档:附录:公共资源组定价详情

构建Estimator的示例代码:

from pai.estimator import Estimator
from pai.image import retrieve

# 使用PAI提供的1.18PAI版本的PyTorch GPU镜像运行训练脚本
image_uri = retrieve(
    "PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)

est = Estimator(
    # 训练作业启动命令,默认工作目录为/ml/usercode/
    command="python train.py $PAI_USER_ARGS",
    # 需要上传的训练代码目录的相对路径或是绝对路径
  	# 默认会准备到训练作业环境的/ml/usercode 目录下
    source_dir="./train_src/",
    # 训练作业镜像
    image_uri=image_uri,
    # 机器配置
    instance_type="ecs.gn6i-c4g1.xlarge",  # 4vCPU 15GB 1*NVIDIA T4
    # 训练作业超参
    hyperparameters={
        "epochs": 5,
        "batch-size": 64 * 4,
        "lr": 0.5,
    },
    # 训练作业的Metric捕获配置
    metric_definitions=[
        {
            "Name": "loss",
            "Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
        },
    ],
    base_job_name="pytorch_mnist",
)

est.fit方法将用户的训练作业提交到PAI上执行。任务提交之后,SDK会打印作业详情页链接,并持续打印训练作业的日志,直到作业执行结束。

当用户需要直接使用OSS上数据,可以通过estimator.fit方法的inputs参数传递。通过inputs传递数据存储路径会被挂载到目录下,用户的训练脚本可以通过读取本地文件的方式加载数据。

本示例中,我们将上传到OSS的训练数据作为训练输入数据。

# 如果使用ossutil上传训练数据,我们需要显式赋值输入数据的OSS URI路径
# data_uri = "oss://<YourOssBucket>/mnist/data/"

# 提交训练作业
est.fit(
    inputs={
        "train_data": data_uri,
    }
)

# 训练作业产出的模型路径
print("TrainingJob output model data:")
print(est.model_data())

对于提交训练作业的详细介绍,请查看PAI Python SDK提交训练作业

部署推理服务

在训练作业结束之后,我们可以使用estimator.model_data()方法拿到训练作业产出模型的OSS路径。下面的流程中,我们将训练产出的模型部署到PAI创建在线推理服务。

部署推理服务的主要流程包括:

  • 通过InferenceSpec描述如何使用模型构建推理服务。

用户可以选择使用Processor或是自定义镜像的模式进行模型部署。以下示例中将分别使用两种方式部署获得的PyTorch模型。

  • 通过Model.deploy方法,配置服务的使用资源、服务名称等信息,创建推理服务。

对于部署推理服务的详细介绍,请参见:部署推理服务

Processor 模式部署

Processor是PAI对于推理服务程序包的抽象描述,负责加载模型并启动模型推理服务。模型推理服务会暴露API支持用户进行调用。PAI提供了预置PyTorch Processor,支持用户方便地将TorchScript格式的模型部署到PAI,创建推理服务。对于PyTorch Processor的详细介绍,请参见:PyTorch Processor

以下示例中,我们通过PyTorch Processor将训练产出的模型部署为一个推理服务。

from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str


m = Model(
 model_data=est.model_data(),
 # 使用PAI提供的PyTorch Processor
 inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
)

p: Predictor = m.deploy(
 service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
 instance_type="ecs.c6.xlarge",
)

print(p.service_name)
print(p.service_status)

Model.deploy返回的Predictor对象指向创建的推理服务,可以通过Predictor.predict方法发送预测请求给到服务,拿到预测结果。

我们使用NumPy构建了一个测试样本数据,发送给推理服务。

import numpy as np

# 以上保存TorchScritp模型要求输入为 Float32, 数据格式的形状为 (BatchSize, Channel, Weight, Height)
dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)

# np.random.rand(1, 1, 28, 28).dtype
res = p.predict(dummy_input)
print(res)

print(np.argmax(res, 1))

在测试完成之后,可以通过Predictor.delete_service删除推理服务。

p.delete_service()

镜像部署

Processor模式启动的推理服务性能优越,适合于对于性能较为敏感的场景。对于一些需要灵活自定义的场景,例如模型使用了一些第三方的依赖,或是推理服务需要有前处理和后处理,用户可以通过镜像部署的方式实现。 SDK提供了pai.model.container_serving_spec()方法,支持用户使用本地的推理服务代码配合PAI提供的基础镜像的方式创建推理服务。

在使用镜像部署之前,我们需要准备模型服务的代码,负责加载模型、拉起HTTP Server、处理用户的推理请求。我们将使用Flask编写一个模型服务的代码,示例如下:

import json
from flask import Flask, request
from PIL import Image
import os
import torch
import torchvision.transforms as transforms
import numpy as np
import io

app = Flask(__name__)
# 用户指定模型,默认会被加载到当前路径下。 
MODEL_PATH = "/eas/workspace/model/"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)


@app.route("/", methods=["POST"])
def predict():
    # 预处理图片数据
    im = Image.open(io.BytesIO(request.data))
    input_tensor = transform(im).to(device)
    input_tensor.unsqueeze_(0)
    # 使用模型进行推理
    output_tensor = model(input_tensor)
    pred_res =output_tensor.detach().cpu().numpy()[0] 

    return json.dumps(pred_res.tolist())


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))

我们需要将以上的代码保存到本地,供后续上传。在本示例中,我们需要在本地新建目录infer_src,将以上的推理服务代码保存到infer_src/run.py,目录结构如下:

|-- infer_src                  # 待上传的推理服务代码目录
    |-- requirements.txt       # 可选:推理服务的第三方包依赖
    `-- run.py                 # 保存的推理服务脚本

通过pai.model.container_serving_spec,我们基于本地脚本和PAI提供的PyTorch镜像创建了一个InferenceSpec对象。

  • 模型服务的代码和启动命令:

用户指定的本地脚本目录source_dir参数会被上传到OSS,然后挂载到服务容器(默认到 /ml/usercode目录)。

  • 推理服务镜像:

PAI 提供了基础的推理镜像支持用户使用,用户可以通过pai.image.retrieve方法,指定参数image_scope=ImageScope.INFERENCE获取PAI提供的推理镜像。

  • 模型服务的第三方依赖包:

模型服务代码或是模型的依赖,可以通过requirements参数指定,相应的依赖会在服务程序启动前被安装到环境中。

使用训练作业输出的模型和上述的InferenceSpec,我们将通过Model.deployAPI部署一个在线推理服务。

from pai.model import InferenceSpec, container_serving_spec, Model
from pai.image import retrieve, ImageScope
from pai.common.utils import random_str
import numpy as np

torch_image_uri = retrieve(
    framework_name="pytorch", framework_version="1.12", accelerator_type="CPU"
).image_uri

inf_spec = container_serving_spec(
    command="python run.py",
    source_dir="./infer_src/",
    image_uri=torch_image_uri,
    requirements=["flask==2.0.0"],
)
print(inf_spec.to_dict())

m = Model(
    model_data=est.model_data(),
    inference_spec=inf_spec,
)

predictor = m.deploy(
    service_name="torch_container_{}".format(random_str(6)),
    instance_type="ecs.c6.xlarge",
)

我们准备一张MNIST测试图片,用于发送给推理服务。

import base64
from PIL import Image
from IPython import display
import io


# raw_data是一张MNIST图片,对应数字9
raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")

im = Image.open(io.BytesIO(raw_data))
display.display(im)

推理服务使用HTTP请求体内的数据作为输入的图片,SDK的raw_predict方法接受bytes数据类型的请求,通过POST方法,在请求体(HTTP Request Body)带上用户推理数据,发送给到推理服务。

from pai.predictor import RawResponse
import numpy as np

resp: RawResponse = predictor.raw_predict(data=raw_data)
print(resp.json())

print(np.argmax(resp.json()))

测试完成之后可以删除服务。

predictor.delete_service()

附件

本示例的Jupyter Notebook:使用PAI Python SDK训练和部署PyTorch模型