使用PAI Python SDK训练和部署PyTorch模型

PAI Python SDK提供了更易用的HighLevel API,支持您在PAI完成模型的训练和部署。本文介绍如何使用PAI Python SDK训练和部署PyTorch模型。

背景信息

PyTorch是一个非常流行的深度学习框架,提供了较高的灵活性和优越的性能,能够与Python丰富的生态无缝结合,被广泛应用于图像分类、语音识别、自然语言处理、推荐和AIGC等领域。本示例为您介绍如何使用PAI Python SDKPAI完成一个PyTorch模型的训练,然后使用训练获得的模型部署推理服务。主要流程包括:

  1. 安装和配置SDK

    安装PAI Python SDK,并配置访问密钥AccessKey、使用的工作空间和OSS Bucket。

  2. 准备训练数据

    下载MNIST数据集,并上传到OSS上供训练作业使用。

  3. 准备训练脚本

    采用PyTorch官方示例库中的MNIST训练脚本作为基础,并进行适当修改后作为训练脚本。

  4. 提交训练作业

    使用PAI Python SDK提供的Estimator API,创建一个训练作业,提交到PAI上执行。

  5. 部署推理服务

    将以上训练作业输出的模型,分别使用Processor和镜像部署的方式部署到EAS,创建为在线推理服务。

前提条件

安装和配置SDK

首先需要在命令行终端中安装PAI Python SDK以运行本示例。

python -m pip install "alipai>=0.4.0"
说明

如果在回显信息中出现ModuleNotFoundError类型的错误,请尝试执行pip install --upgrade pip命令来解决该问题。

SDK安装完成后,通过在命令行终端中执行以下命令进行配置。

python -m pai.toolkit.config

详细的安装和配置过程,请参见安装和配置

准备训练数据

在本示例中,将使用MNIST数据集训练一个图片分类模型。当您提交训练作业至PAI平台时,需要准备数据集,并上传到OSS Bucket中。

  1. 下载MNIST数据集。

    使用以下Shell脚本,将MNIST数据集下载到本地目录data

    #!/bin/sh
    set -e
    
    url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/"
    # 如果上述地址下载速度较慢,可以使用以下地址。
    # url_prefix="http://yann.lecun.com/exdb/mnist/"
    
    mkdir -p data/MNIST/raw/
    
    wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/
    wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/
    wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/
    wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
    
  2. 将数据集上传到OSS Bucket中。

    您可以使用OSS提供的命令行工具ossutil上传相应的文件,关于ossutil的安装和使用,请参见命令行工具ossutil 1.0。您也可以使用PAI Python SDK提供的便利方法,将本地训练数据上传到OSS Bucket/mnist/data/路径下。

    • 通过ossutil上传文件:

      ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
    • 使用PAI Python SDK上传文件:

      from pai.common.oss_utils import upload
      from pai.session import get_default_session
      
      sess = get_default_session()
      data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket)
      print(data_uri)
      

准备训练脚本

在提交训练作业之前,需要通过PyTorch编写训练脚本。在本示例中,以PyTorch官方提供的MNIST示例为基础,并适当修改了数据加载和模型保存的逻辑后,作为训练脚本。

  • 使用环境变量获得输入数据路径

    通过estimator.fit(inputs={"train_data":data_uri})传递上述的OSS数据URI,相应的数据会被挂载到训练容器中。训练脚本可以通过读取本地文件的方式,读取到挂载的数据。

    对于训练作业,estimator.fit方法的inputs是字典,对应的每一个输入数据都是一个 Channel,KeyChannel名,Value是数据存储路径。训练作业脚本可以通过PAI_INPUT_{ChannelNameUpperCase}环境变量获取到输入数据挂载到工作容器内的数据路径。

    对数据加载部分的代码进行了如下修改:

    - dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
    - dataset2 = datasets.MNIST("../data", train=False, transform=transform)
    
    + # 通过环境变量获得输入数据路径
    + data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data")
    + dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    + dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
    
  • 使用环境变量获取模型的保存路径:

    您需要将模型保存在训练环境的指定路径中,此路径由环境变量PAI_OUTPUT_MODEL指定(默认路径为/ml/output/model)。位于该路径下的数据和模型将自动保存到您的OSS Bucket中。

    对模型保存部分的代码进行了如下修改:

    - if args.save_model:
    -     torch.save(model.state_dict(), "mnist_cnn.pt")
    + # 保存模型
    + save_model(model)
    + 
    + def save_model(model):
    +     """将模型转为TorchScript,保存到指定路径。"""
    +     output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
    +     os.makedirs(output_model_path, exist_ok=True)
    +     
    +     m = torch.jit.script(model)
    +     m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
    

PAI提供的预置PyTorch Processor在创建服务时,要求输入的模型是TorchScript格式。本示例将模型导出为TorchScript格式。完整的训练作业脚本如下:

# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            if args.dry_run:
                break


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )


def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=14,
        metavar="N",
        help="number of epochs to train (default: 14)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=1.0,
        metavar="LR",
        help="learning rate (default: 1.0)",
    )
    parser.add_argument(
        "--gamma",
        type=float,
        default=0.7,
        metavar="M",
        help="Learning rate step gamma (default: 0.7)",
    )
    parser.add_argument(
        "--no-cuda", action="store_true", default=False, help="disables CUDA training"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        default=False,
        help="quickly check a single pass",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--save-model",
        action="store_true",
        default=False,
        help="For Saving the current Model",
    )
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    train_kwargs = {"batch_size": args.batch_size}
    test_kwargs = {"batch_size": args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    data_path = os.environ.get("PAI_INPUT_DATA")
    dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
    train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    # 保存模型
    save_model(model)


def save_model(model):
    """将模型转为TorchScript,保存到指定路径."""
    output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
    os.makedirs(output_model_path, exist_ok=True)

    m = torch.jit.script(model)
    m.save(os.path.join(output_model_path, "mnist_cnn.pt"))


if __name__ == "__main__":
    main()

您需要将上述训练代码保存到本地目录中,后续使用Estimator提交到PAI上执行。本示例将新建一个train_src目录,将训练脚本保存为train_src/train.py

|-- train_src                       # 待上传的训练脚本目录
    |-- requirements.txt            # 可选:训练作业的第三方包依赖
    `-- train.py                    # 保存的训练作业脚本

提交训练作业

Estimator支持使用本地的训练脚本、指定的镜像在PAI上执行训练作业。

  • 训练作业脚本和命令:

    您的训练脚本所在目录(参数source_dir)会被上传到OSS,在作业启动之前准备到作业容器中,默认为/ml/usercode目录。您指定的启动命令(command参数)的工作目录同样是/ml/usercode

  • 训练作业镜像:

    本示例使用PAI提供的PyTorch镜像运行训练作业。

  • 训练作业超参:

    您可以通过读取${PAI_CONFIG_DIR}/hyperparameters.json文件获取训练作业的超参 ,也可以通过环境变量获取训练作业超参,详情请参见训练作业预置环境变量

    本示例执行的命令是python train.py $PAI_USER_ARGS,其中PAI_USER_ARGS环境变量是作业超参以命令行参数的方式拼接获得的字符串。训练作业最终的启动命令是python train.py --epochs 5 --batch-size 256 --lr 0.5

  • 通过metric_definitions指定需要采集的Metrics:

    PAI的训练服务支持从训练作业输出日志中(训练脚本打印的标准输出和标准错误输出),以正则表达式匹配的方式捕获训练作业Metrics信息。通过SDK打印的作业的详情页链接,您可以查看作业的详情配置、输出日志以及训练作业的Metrics。

  • 通过instance_type指定作业使用的机器实例类型

    PAI的训练作业支持的机器实例类型,请参见附录:公共资源规格列表

在本示例中,Estimator的示例代码如下:

from pai.estimator import Estimator
from pai.image import retrieve

# 使用PAI提供的1.18PAI版本的PyTorch GPU镜像运行训练脚本。
image_uri = retrieve(
    "PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)

est = Estimator(
    # 训练作业启动命令,默认工作目录为/ml/usercode/。
    command="python train.py $PAI_USER_ARGS",
    # 需要上传的训练代码目录的相对路径或绝对路径。
  	# 默认会准备到训练作业环境的/ml/usercode目录下。
    source_dir="./train_src/",
    # 训练作业镜像。
    image_uri=image_uri,
    # 机器配置。
    instance_type="ecs.gn6i-c4g1.xlarge",  # 4vCPU 15GB 1*NVIDIA T4
    # 训练作业超参。
    hyperparameters={
        "epochs": 5,
        "batch-size": 64 * 4,
        "lr": 0.5,
    },
    # 训练作业的Metric捕获配置。
    metric_definitions=[
        {
            "Name": "loss",
            "Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
        },
    ],
    base_job_name="pytorch_mnist",
)

将上传到OSS的训练数据作为训练输入数据,并执行训练作业。

# 如果使用ossutil上传训练数据,您需要显式赋值输入数据的OSS URI路径。
# data_uri = "oss://<YourOssBucket>/mnist/data/"

# 提交训练作业
est.fit(
    inputs={
        "train_data": data_uri,
    }
)

# 训练作业产出的模型路径
print("TrainingJob output model data:")
print(est.model_data())

est.fit方法将您的训练作业提交到PAI上执行。任务提交之后,SDK会打印作业详情页链接,并持续打印训练作业的日志,直到作业执行结束。

当您需要直接使用OSS上的数据,可以通过estimator.fit方法的inputs参数传递。通过inputs传递数据存储路径会被挂载到工作目录中,您的训练脚本可以通过读取本地文件的方式加载数据。

对于提交训练作业的详细介绍,请参见提交训练作业

部署推理服务

在训练作业执行成功后,您可以使用estimator.model_data()方法获取训练作业产出模型的OSS路径。以下内容为您介绍如何将训练产出的模型部署到PAI创建为在线推理服务。

  • 通过InferenceSpec定义模型推理的配置。

    您可以选择使用Processor或自定义镜像的模式进行模型部署。在以下示例中,将分别使用两种方式部署获得的PyTorch模型。

  • 通过Model.deploy方法,配置服务的使用资源、服务名称等信息,创建推理服务。

对于部署推理服务的详细介绍,请参见部署推理服务

Processor模式部署

ProcessorPAI对于推理服务程序包的抽象描述,负责加载模型并启动模型推理服务。模型推理服务支持用户使用API方式进行调用。PAI提供了预置PyTorch Processor,支持用户方便地将TorchScript格式的模型部署到PAI,创建推理服务。对于PyTorch Processor的详细介绍,请参见:PyTorch Processor

  1. 部署服务

    本示例通过PyTorch Processor将训练产出的模型部署为一个推理服务,示例代码如下:

    from pai.model import Model, InferenceSpec
    from pai.predictor import Predictor
    from pai.common.utils import random_str
    
    
    m = Model(
     model_data=est.model_data(),
     # 使用PAI提供的PyTorch Processor
     inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
    )
    
    p: Predictor = m.deploy(
     service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)),
     instance_type="ecs.c6.xlarge",
    )
    
    print(p.service_name)
    print(p.service_status)
    

    Model.deploy方法返回Predictor对象并链接到新创建的推理服务。您可以通过Predictor.predict方法向该服务发送预测请求并获取预测结果。

  2. 推理服务

    使用NumPy构建一个测试样本数据,发送给推理服务。

    import numpy as np
    
    # 以上保存TorchScritp模型要求输入为 Float32, 数据格式的形状为 (BatchSize, Channel, Weight, Height)
    dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32)
    
    # np.random.rand(1, 1, 28, 28).dtype
    res = p.predict(dummy_input)
    print(res)
    
    print(np.argmax(res, 1))
    
  3. 删除推理服务

    在测试完成之后,可以通过Predictor.delete_service删除推理服务。

    p.delete_service()

镜像部署

Processor模式启动的推理服务性能优越,适用于性能较为敏感的场景。对于一些需要灵活自定义的场景,例如模型使用了一些第三方的依赖,或是推理服务需要有前处理和后处理,则可以通过镜像部署的方式实现。SDK提供了pai.model.container_serving_spec()方法,支持您使用本地的推理服务代码配合PAI提供的基础镜像的方式创建推理服务。

  1. 准备推理服务的代码文件。

    在使用镜像部署之前,您需要准备推理服务的代码,该代码负责加载模型、拉起HTTP Server、处理用户的推理请求。使用Flask编写一个模型服务的代码,示例如下:

    import json
    from flask import Flask, request
    from PIL import Image
    import os
    import torch
    import torchvision.transforms as transforms
    import numpy as np
    import io
    
    app = Flask(__name__)
    # 用户指定模型,默认会被加载到当前路径下。 
    MODEL_PATH = "/eas/workspace/model/"
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device)
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )
    
    
    @app.route("/", methods=["POST"])
    def predict():
        # 预处理图片数据
        im = Image.open(io.BytesIO(request.data))
        input_tensor = transform(im).to(device)
        input_tensor.unsqueeze_(0)
        # 使用模型进行推理
        output_tensor = model(input_tensor)
        pred_res =output_tensor.detach().cpu().numpy()[0]
    
        return json.dumps(pred_res.tolist())
    
    
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))
    

    您需要将以上的代码保存到本地,供后续上传使用。在本示例中,您需要在本地新建目录infer_src,将上述推理服务代码保存为infer_src/run.py,目录结构如下:

    |-- infer_src                  # 待上传的推理服务代码目录
        |-- requirements.txt       # 可选:推理服务的第三方包依赖
        `-- run.py                 # 保存的推理服务脚本

  2. 通过pai.model.container_serving_spec,基于本地脚本和PAI提供的PyTorch镜像创建一个InferenceSpec对象。

    from pai.model import InferenceSpec, container_serving_spec
    from pai.image import retrieve, ImageScope
    
    torch_image_uri = retrieve("PyTorch", framework_version="latest", image_scope=ImageScope.INFERENCE).image_uri
    
    inf_spec = container_serving_spec(
        command="python run.py",
        source_dir="./infer_src/",
        image_uri=torch_image_uri,
        requirements=["flask==2.0.0", "Werkzeug==2.2.2", "pillow", "torchvision"],
    )
    print(inf_spec.to_dict())
    
    • 模型服务的代码和启动命令:

      用户指定的本地脚本目录source_dir参数会被上传到OSS,然后挂载到服务容器(默认到 /ml/usercode目录)。

    • 推理服务镜像:

      PAI提供了基础的推理镜像支持用户使用,您可以通过pai.image.retrieve方法,指定参数image_scope=ImageScope.INFERENCE获取PAI提供的推理镜像。

    • 模型服务的第三方依赖包:

      模型服务代码或是模型的依赖,可以通过requirements参数指定,相应的依赖会在服务程序启动前被安装到环境中。

  3. 使用训练作业输出的模型和上述的InferenceSpec,通过Model.deployAPI部署一个在线推理服务。

    from pai.model import Model
    from pai.common.utils import random_str
    import numpy as np
    
    
    m = Model(
        model_data=est.model_data(),
        inference_spec=inf_spec,
    )
    
    predictor = m.deploy(
        service_name="torch_mnist_script_container_{}".format(random_str(6)),
        instance_type="ecs.c6.xlarge",
    )
    
  4. 推理服务。

    1. 准备一张MNIST测试图片。

      import base64
      from PIL import Image
      from IPython import display
      import io
      
      !pip install -q pillow
      # raw_data是一张MNIST图片,对应数字9。
      raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==")
      
      im = Image.open(io.BytesIO(raw_data))
      display.display(im)
      
    2. 将请求发送给推理服务。

      推理服务使用HTTP请求体内的数据作为输入的图片,SDKraw_predict方法接受bytes数据类型的请求,通过POST方法,在请求体(HTTP Request Body)带上用户推理数据,发送给推理服务。

      from pai.predictor import RawResponse
      import numpy as np
      
      resp: RawResponse = predictor.raw_predict(data=raw_data)
      print(resp.json())
      
      print(np.argmax(resp.json()))
      
  5. 测试完成之后可以删除服务。

    predictor.delete_service()

附件

本示例的Jupyter Notebook:使用PAI Python SDK训练和部署PyTorch模型