PAI Python SDK提供了更易用的HighLevel API,支持您在PAI完成模型的训练和部署。本文介绍如何使用PAI Python SDK训练和部署PyTorch模型。
背景信息
PyTorch是一个非常流行的深度学习框架,提供了较高的灵活性和优越的性能,能够与Python丰富的生态无缝结合,被广泛应用于图像分类、语音识别、自然语言处理、推荐和AIGC等领域。本示例为您介绍如何使用PAI Python SDK在PAI完成一个PyTorch模型的训练,然后使用训练获得的模型部署推理服务。主要流程包括:
安装PAI Python SDK,并配置访问密钥AccessKey、使用的工作空间和OSS Bucket。
下载MNIST数据集,并上传到OSS上供训练作业使用。
采用PyTorch官方示例库中的MNIST训练脚本作为基础,并进行适当修改后作为训练脚本。
使用PAI Python SDK提供的Estimator API,创建一个训练作业,提交到PAI上执行。
将以上训练作业输出的模型,分别使用Processor和镜像部署的方式部署到EAS,创建为在线推理服务。
前提条件
已获取阿里云账号的鉴权AccessKey ID和AccessKey Secret,详情请参见:获取AccessKey。
已创建工作空间,详情请参见:创建及管理工作空间。
已创建OSS Bucket,详情请参见:控制台创建存储空间。
已准备Python环境,要求3.7及其以上版本。
安装和配置SDK
首先需要在命令行终端中安装PAI Python SDK以运行本示例。
python -m pip install "alipai>=0.4.0"
如果在回显信息中出现ModuleNotFoundError
类型的错误,请尝试执行pip install --upgrade pip
命令来解决该问题。
在SDK安装完成后,通过在命令行终端中执行以下命令进行配置。
python -m pai.toolkit.config
详细的安装和配置过程,请参见安装和配置。
准备训练数据
在本示例中,将使用MNIST数据集训练一个图片分类模型。当您提交训练作业至PAI平台时,需要准备数据集,并上传到OSS Bucket中。
下载MNIST数据集。
使用以下Shell脚本,将MNIST数据集下载到本地目录
data
。#!/bin/sh set -e url_prefix="https://ossci-datasets.s3.amazonaws.com/mnist/" # 如果上述地址下载速度较慢,可以使用以下地址。 # url_prefix="http://yann.lecun.com/exdb/mnist/" mkdir -p data/MNIST/raw/ wget -nv ${url_prefix}train-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}train-labels-idx1-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-images-idx3-ubyte.gz -P data/MNIST/raw/ wget -nv ${url_prefix}t10k-labels-idx1-ubyte.gz -P data/MNIST/raw/
将数据集上传到OSS Bucket中。
您可以使用OSS提供的命令行工具
ossutil
上传相应的文件,关于ossutil的安装和使用,请参见命令行工具ossutil 1.0。您也可以使用PAI Python SDK提供的便利方法,将本地训练数据上传到OSS Bucket的/mnist/data/
路径下。通过
ossutil
上传文件:ossutil cp -rf ./data oss://<YourOssBucket>/mnist/data/
使用PAI Python SDK上传文件:
from pai.common.oss_utils import upload from pai.session import get_default_session sess = get_default_session() data_uri = upload("./data/", oss_path="mnist/data/", bucket=sess.oss_bucket) print(data_uri)
准备训练脚本
在提交训练作业之前,需要通过PyTorch编写训练脚本。在本示例中,以PyTorch官方提供的MNIST示例为基础,并适当修改了数据加载和模型保存的逻辑后,作为训练脚本。
使用环境变量获得输入数据路径
通过
estimator.fit(inputs={"train_data":data_uri})
传递上述的OSS数据URI,相应的数据会被挂载到训练容器中。训练脚本可以通过读取本地文件的方式,读取到挂载的数据。对于训练作业,
estimator.fit
方法的inputs
是字典,对应的每一个输入数据都是一个 Channel,Key是Channel名,Value是数据存储路径。训练作业脚本可以通过PAI_INPUT_{ChannelNameUpperCase}
环境变量获取到输入数据挂载到工作容器内的数据路径。对数据加载部分的代码进行了如下修改:
- dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform) - dataset2 = datasets.MNIST("../data", train=False, transform=transform) + # 通过环境变量获得输入数据路径 + data_path = os.environ.get("PAI_INPUT_TRAIN_DATA", "../data") + dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform) + dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
使用环境变量获取模型的保存路径:
您需要将模型保存在训练环境的指定路径中,此路径由环境变量PAI_OUTPUT_MODEL指定(默认路径为/ml/output/model)。位于该路径下的数据和模型将自动保存到您的OSS Bucket中。
对模型保存部分的代码进行了如下修改:
- if args.save_model: - torch.save(model.state_dict(), "mnist_cnn.pt") + # 保存模型 + save_model(model) + + def save_model(model): + """将模型转为TorchScript,保存到指定路径。""" + output_model_path = os.environ.get("PAI_OUTPUT_MODEL") + os.makedirs(output_model_path, exist_ok=True) + + m = torch.jit.script(model) + m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
PAI提供的预置PyTorch Processor在创建服务时,要求输入的模型是TorchScript格式。本示例将模型导出为TorchScript格式。完整的训练作业脚本如下:
# source: https://github.com/pytorch/examples/blob/main/mnist/main.py
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.argmax(
dim=1, keepdim=True
) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss,
correct,
len(test_loader.dataset),
100.0 * correct / len(test_loader.dataset),
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=14,
metavar="N",
help="number of epochs to train (default: 14)",
)
parser.add_argument(
"--lr",
type=float,
default=1.0,
metavar="LR",
help="learning rate (default: 1.0)",
)
parser.add_argument(
"--gamma",
type=float,
default=0.7,
metavar="M",
help="Learning rate step gamma (default: 0.7)",
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="quickly check a single pass",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--save-model",
action="store_true",
default=False,
help="For Saving the current Model",
)
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": args.batch_size}
test_kwargs = {"batch_size": args.test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
data_path = os.environ.get("PAI_INPUT_DATA")
dataset1 = datasets.MNIST(data_path, train=True, download=True, transform=transform)
dataset2 = datasets.MNIST(data_path, train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
# 保存模型
save_model(model)
def save_model(model):
"""将模型转为TorchScript,保存到指定路径."""
output_model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(output_model_path, exist_ok=True)
m = torch.jit.script(model)
m.save(os.path.join(output_model_path, "mnist_cnn.pt"))
if __name__ == "__main__":
main()
您需要将上述训练代码保存到本地目录中,后续使用Estimator
提交到PAI上执行。本示例将新建一个train_src
目录,将训练脚本保存为train_src/train.py
。
|-- train_src # 待上传的训练脚本目录
|-- requirements.txt # 可选:训练作业的第三方包依赖
`-- train.py # 保存的训练作业脚本
提交训练作业
Estimator
支持使用本地的训练脚本、指定的镜像在PAI上执行训练作业。
训练作业脚本和命令:
您的训练脚本所在目录(参数source_dir)会被上传到OSS,在作业启动之前准备到作业容器中,默认为
/ml/usercode
目录。您指定的启动命令(command参数)的工作目录同样是/ml/usercode
。
训练作业镜像:
本示例使用PAI提供的PyTorch镜像运行训练作业。
训练作业超参:
您可以通过读取
${PAI_CONFIG_DIR}/hyperparameters.json
文件获取训练作业的超参 ,也可以通过环境变量获取训练作业超参,详情请参见训练作业预置环境变量。本示例执行的命令是
python train.py $PAI_USER_ARGS
,其中PAI_USER_ARGS
环境变量是作业超参以命令行参数的方式拼接获得的字符串。训练作业最终的启动命令是python train.py --epochs 5 --batch-size 256 --lr 0.5
。通过
metric_definitions
指定需要采集的Metrics:PAI的训练服务支持从训练作业输出日志中(训练脚本打印的标准输出和标准错误输出),以正则表达式匹配的方式捕获训练作业Metrics信息。通过SDK打印的作业的详情页链接,您可以查看作业的详情配置、输出日志以及训练作业的Metrics。
通过
instance_type
指定作业使用的机器实例类型:PAI的训练作业支持的机器实例类型,请参见附录:公共资源规格列表。
在本示例中,Estimator的示例代码如下:
from pai.estimator import Estimator
from pai.image import retrieve
# 使用PAI提供的1.18PAI版本的PyTorch GPU镜像运行训练脚本。
image_uri = retrieve(
"PyTorch", framework_version="1.8PAI", accelerator_type="GPU"
).image_uri
print(image_uri)
est = Estimator(
# 训练作业启动命令,默认工作目录为/ml/usercode/。
command="python train.py $PAI_USER_ARGS",
# 需要上传的训练代码目录的相对路径或绝对路径。
# 默认会准备到训练作业环境的/ml/usercode目录下。
source_dir="./train_src/",
# 训练作业镜像。
image_uri=image_uri,
# 机器配置。
instance_type="ecs.gn6i-c4g1.xlarge", # 4vCPU 15GB 1*NVIDIA T4
# 训练作业超参。
hyperparameters={
"epochs": 5,
"batch-size": 64 * 4,
"lr": 0.5,
},
# 训练作业的Metric捕获配置。
metric_definitions=[
{
"Name": "loss",
"Regex": r".*loss=([-+]?[0-9]*.?[0-9]+(?:[eE][-+]?[0-9]+)?).*",
},
],
base_job_name="pytorch_mnist",
)
将上传到OSS的训练数据作为训练输入数据,并执行训练作业。
# 如果使用ossutil上传训练数据,您需要显式赋值输入数据的OSS URI路径。
# data_uri = "oss://<YourOssBucket>/mnist/data/"
# 提交训练作业
est.fit(
inputs={
"train_data": data_uri,
}
)
# 训练作业产出的模型路径
print("TrainingJob output model data:")
print(est.model_data())
est.fit
方法将您的训练作业提交到PAI上执行。任务提交之后,SDK会打印作业详情页链接,并持续打印训练作业的日志,直到作业执行结束。
当您需要直接使用OSS上的数据,可以通过estimator.fit
方法的inputs
参数传递。通过inputs
传递数据存储路径会被挂载到工作目录中,您的训练脚本可以通过读取本地文件的方式加载数据。
对于提交训练作业的详细介绍,请参见提交训练作业。
部署推理服务
在训练作业执行成功后,您可以使用estimator.model_data()
方法获取训练作业产出模型的OSS路径。以下内容为您介绍如何将训练产出的模型部署到PAI创建为在线推理服务。
通过
InferenceSpec
定义模型推理的配置。您可以选择使用Processor或自定义镜像的模式进行模型部署。在以下示例中,将分别使用两种方式部署获得的PyTorch模型。
通过
Model.deploy
方法,配置服务的使用资源、服务名称等信息,创建推理服务。
对于部署推理服务的详细介绍,请参见部署推理服务。
Processor模式部署
Processor是PAI对于推理服务程序包的抽象描述,负责加载模型并启动模型推理服务。模型推理服务支持用户使用API方式进行调用。PAI提供了预置PyTorch Processor,支持用户方便地将TorchScript
格式的模型部署到PAI,创建推理服务。对于PyTorch Processor的详细介绍,请参见:PyTorch Processor。
部署服务
本示例通过PyTorch Processor将训练产出的模型部署为一个推理服务,示例代码如下:
from pai.model import Model, InferenceSpec from pai.predictor import Predictor from pai.common.utils import random_str m = Model( model_data=est.model_data(), # 使用PAI提供的PyTorch Processor inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"), ) p: Predictor = m.deploy( service_name="tutorial_pt_mnist_proc_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", ) print(p.service_name) print(p.service_status)
Model.deploy
方法返回Predictor
对象并链接到新创建的推理服务。您可以通过Predictor.predict
方法向该服务发送预测请求并获取预测结果。推理服务
使用NumPy构建一个测试样本数据,发送给推理服务。
import numpy as np # 以上保存TorchScritp模型要求输入为 Float32, 数据格式的形状为 (BatchSize, Channel, Weight, Height) dummy_input = np.random.rand(2, 1, 28, 28).astype(np.float32) # np.random.rand(1, 1, 28, 28).dtype res = p.predict(dummy_input) print(res) print(np.argmax(res, 1))
删除推理服务
在测试完成之后,可以通过
Predictor.delete_service
删除推理服务。p.delete_service()
镜像部署
Processor模式启动的推理服务性能优越,适用于性能较为敏感的场景。对于一些需要灵活自定义的场景,例如模型使用了一些第三方的依赖,或是推理服务需要有前处理和后处理,则可以通过镜像部署的方式实现。SDK提供了pai.model.container_serving_spec()
方法,支持您使用本地的推理服务代码配合PAI提供的基础镜像的方式创建推理服务。
准备推理服务的代码文件。
在使用镜像部署之前,您需要准备推理服务的代码,该代码负责加载模型、拉起HTTP Server、处理用户的推理请求。使用Flask编写一个模型服务的代码,示例如下:
import json from flask import Flask, request from PIL import Image import os import torch import torchvision.transforms as transforms import numpy as np import io app = Flask(__name__) # 用户指定模型,默认会被加载到当前路径下。 MODEL_PATH = "/eas/workspace/model/" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = torch.jit.load(os.path.join(MODEL_PATH, "mnist_cnn.pt"), map_location=device).to(device) transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] ) @app.route("/", methods=["POST"]) def predict(): # 预处理图片数据 im = Image.open(io.BytesIO(request.data)) input_tensor = transform(im).to(device) input_tensor.unsqueeze_(0) # 使用模型进行推理 output_tensor = model(input_tensor) pred_res =output_tensor.detach().cpu().numpy()[0] return json.dumps(pred_res.tolist()) if __name__ == '__main__': app.run(host="0.0.0.0", port=int(os.environ.get("LISTENING_PORT", 8000)))
您需要将以上的代码保存到本地,供后续上传使用。在本示例中,您需要在本地新建目录
infer_src
,将上述推理服务代码保存为infer_src/run.py
,目录结构如下:|-- infer_src # 待上传的推理服务代码目录 |-- requirements.txt # 可选:推理服务的第三方包依赖 `-- run.py # 保存的推理服务脚本
通过
pai.model.container_serving_spec
,基于本地脚本和PAI提供的PyTorch镜像创建一个InferenceSpec对象。from pai.model import InferenceSpec, container_serving_spec from pai.image import retrieve, ImageScope torch_image_uri = retrieve("PyTorch", framework_version="latest", image_scope=ImageScope.INFERENCE).image_uri inf_spec = container_serving_spec( command="python run.py", source_dir="./infer_src/", image_uri=torch_image_uri, requirements=["flask==2.0.0", "Werkzeug==2.2.2", "pillow", "torchvision"], ) print(inf_spec.to_dict())
模型服务的代码和启动命令:
用户指定的本地脚本目录
source_dir
参数会被上传到OSS,然后挂载到服务容器(默认到/ml/usercode
目录)。推理服务镜像:
PAI提供了基础的推理镜像支持用户使用,您可以通过
pai.image.retrieve
方法,指定参数image_scope=ImageScope.INFERENCE
获取PAI提供的推理镜像。模型服务的第三方依赖包:
模型服务代码或是模型的依赖,可以通过
requirements
参数指定,相应的依赖会在服务程序启动前被安装到环境中。
使用训练作业输出的模型和上述的InferenceSpec,通过
Model.deploy
API部署一个在线推理服务。from pai.model import Model from pai.common.utils import random_str import numpy as np m = Model( model_data=est.model_data(), inference_spec=inf_spec, ) predictor = m.deploy( service_name="torch_mnist_script_container_{}".format(random_str(6)), instance_type="ecs.c6.xlarge", )
推理服务。
准备一张MNIST测试图片。
import base64 from PIL import Image from IPython import display import io !pip install -q pillow # raw_data是一张MNIST图片,对应数字9。 raw_data = base64.b64decode(b"/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAcABwBAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APn+rVhpmoarP5GnWNzeTYz5dvE0jfkoJovNMv8ATmK3tjc2zByhE8TIQw6jkdR6VVq9oumPrWuWGlxyLG95cRwK7dFLMFyfzr3aXwp4ltAfB3gWwudI01JNuoa7eZhku5AMHafvFOw2Dn6ZJ4z4yeLk1HUbXwrZSSy2Oh5heeaQu88wG1mLHk4wR9c+1eXUqsVYMpIIOQR2r1D4QazqOs/FnSG1fVLi9ZI5vL+2TNKc+U2ApYnB7/hXml5LLNfXEsxLSvIzOSMEsTk1DRVnT7+60vULe/spmhureQSRSL1Vh0NWNd1mXX9ZuNUuLe2gmuCGkS2QohbABbBJwTjJ9yelZ1f/2Q==") im = Image.open(io.BytesIO(raw_data)) display.display(im)
将请求发送给推理服务。
推理服务使用HTTP请求体内的数据作为输入的图片,SDK的
raw_predict
方法接受bytes
数据类型的请求,通过POST
方法,在请求体(HTTP Request Body)带上用户推理数据,发送给推理服务。from pai.predictor import RawResponse import numpy as np resp: RawResponse = predictor.raw_predict(data=raw_data) print(resp.json()) print(np.argmax(resp.json()))
测试完成之后可以删除服务。
predictor.delete_service()
附件
本示例的Jupyter Notebook:使用PAI Python SDK训练和部署PyTorch模型。