文档

部署推理服务

更新时间:

PAI Python SDK提供了易用的API(即HighLevel API),支持用户将模型部署到PAI创建推理服务。本文档介绍了如何使用SDK在PAI部署推理服务。

概要介绍

SDK提供了HighLevel API,即pai.model.Modelpai.predictor.Predictor,支持用户将模型部署到EAS,以及进行调用测试。

通过SDK创建推理服务的基本流程包括:

  • 通过pai.model.InferenceSpec对象描述模型的推理服务配置,包括使用的 Processor 或是镜像的信息。

  • 使用InferenceSpec对象和待部署的模型文件创建一个pai.model.Model对象。

  • 通过pai.model.Model.deploy()方法,指定服务使用的资源、服务名称等信息,在PAI创建一个推理服务。

  • 通过deploy方法返回pai.predictor.Predictor对象,提供了predict方法向推理服务发送预测请求。

示例代码如下:

from pai.model import InferenceSpec, Model, container_serving_spec
from pai.image import retrieve, ImageScope

# 1. 使用PAI提供的PyTorch推理镜像。
torch_image = retrieve("PyTorch", framework_version="latest",
    image_scope=ImageScope.INFERENCE)


# 2. 使用InferenceSpec描述模型的推理配置信息。
inference_spec = container_serving_spec(
    # 推理服务的启动命令。
    command="python app.py",
    source_dir="./src/"
    # 使用的推理镜像。
    image_uri=torch_image.image_uri,
)


# 3. 构建Model对象,用于模型部署。
model = Model(
    # 使用OSS Bucket上的模型文件。
    model_data="oss://<YourBucket>/path-to-model-data",
    inference_spec=inference_spec,
)

# 4. 部署模型到PAI-EAS,创建在线推理服务,返回Predictor对象。
predictor = model.deploy(
    service_name="example_torch_service",
    instance_type="ecs.c6.xlarge",
)

# 5. 测试推理服务。
res = predictor.predict(data=data)

配置模型的InferenceSpec

您可以通过Processor或镜像的方式部署推理服务,pai.model.InferenceSpec对象用于描述模型的推理的配置,例如使用Processor或是镜像部署、运行服务的存储配置、模型服务预热配置、服务的RPC Batch功能配置等,构建的InferenceSpec对象最终会用于推理服务的创建。

使用预置 Processor

Processor是PAI对于推理服务程序包的抽象描述,它能够基于用户提供的模型,直接构建一个推理服务。PAI提供了预置的Processor,支持一系列常见的机器学习模型格式,包括Tensorflow SavedModelPyTorch TorchScriptXGBoostLightGBMPMML等,完整的介绍请参考文档:预置Processor使用说明

对于使用Processor方式部署模型,可以参考以下示例配置InferenceSpec

# 使用预置的TensorFlow Processor。
tf_infer_spec = InferenceSpec(processor="tensorflow_cpu_2.3")


# 使用预置的PyTorch Processor。
tf_infer_spec = InferenceSpec(processor="pytorch_cpu_1.10")

# 使用预置的XGBoost Processor。
xgb_infer_spec = InferenceSpec(processor="xgboost")

您可以在InferenceSpec实例上配置推理服务的更多功能,例如配置服务预热文件、服务的RPC配置等,完整的服务参数信息可以见文档:服务模型所有相关参数说明

# 直接配置InferenceSpec的属性。
tf_infer_spec.warm_up_data_path = "oss://<YourOssBucket>/path/to/warmup-data" # 配置服务预热文件路径。
tf_infer_spec.metadata.rpc.keepalive = 1000 # 配置请求链接的keepalive时长。

print(tf_infer_spec.warm_up_data_path)
print(tf_infer_spec.metadata.rpc.keepalive)

使用镜像部署

使用Processor部署模型提供了易用性,但是无法支持用户灵活自定义的诉求,例如模型或是推理服务程序有较为复杂的依赖。对于类似的场景,PAI提供了镜像部署的方式,支持用户以更加灵活自定义的方式部署模型。

您可以将模型服务的代码和相关的依赖打包构建成一个Docker镜像,然后推送到阿里云ACR镜像仓库服务,然后基于以上的Docker镜像构建InferenceSpec ,用于模型的部署。

from pai.model import InferenceSpec, container_serving_spec

# 通过“container_serving_spec”方法,用户可以构建一个使用镜像服务模型的InferenceSpec。
container_infer_spec = container_serving_spec(
    # 推理服务运行使用的镜像。
    image_uri="<CustomImageUri>",
    # 运行在容器内的推理服务需要监听的端口, 用户发送的预测请求会被PAI转发到服务容器的该端口。
    port=8000,
    environment_variables=environment_variables,
    # 推理服务的启动命令。
    command=command,
    # 推理服务依赖的Python包。
    requirements=[
        "scikit-learn",
        "fastapi==0.87.0",
    ],
)


print(container_infer_spec.to_dict())

m = Model(
    model_data="oss://<YourOssBucket>/path-to-tensorflow-saved-model",
    inference_spec=custom_container_infer_spec,
)
p = m.deploy(
    instance_type="ecs.c6.xlarge"
)

当通过自定义镜像部署的方式部署模型时,需要将推理服务运行所需的代码准备到运行容器、构建镜像并推送到镜像仓库。SDK提供便利方法,支持您将本地的代码以及基础镜像的方式构建推理服务,而无需构建镜像。pai.model.container_serving_spec()支持通过source_dir指定一个本地代码文件目录(参数source_dir),SDK会将代码目录打包上传到OSS Bucket,然后将OSS Bucket的路径挂载到运行容器中。用户指定的启动命令可以拉起推理服务。

from pai.model import InferenceSpec

inference_spec = container_serving_spec(
    # 用户推理程序所在的本地目录路径,会被上传到OSS Bucket,然后挂载到运行容器,默认为/ml/usercode/。
    source_dir="./src",
    # 服务启动命令。当用户指定了source_dir,则默认使用/ml/usercode作为工作目录执行command。
    command="python run.py",
    image_uri="<ServingImageUri>",
    requirements=[
        "fastapi",
        "uvicorn",
    ]
)
print(inference_spec.to_dict())

当用户还有更多的数据、代码或是模型准备到推理服务的容器内时,可以使用pai.model.InferenceSpec.mount()方法,将一个本地目录数据或是OSS上的数据路径挂载到在线服务容器中。

# 将本地的数据上传到OSS,然后挂载到容器的 `/ml/tokenizers` 目录下。
inference_spec.mount("./bert_tokenizers/", "/ml/tokenizers/")

# 直接挂载用户存储在 OSS 上的数据到容器的 `/ml/data` 目录下。
inference_spec.mount("oss://<YourOssBucket>/path/to/data/", "/ml/data/")

获取PAI提供的公共镜像

PAI提供了一些常见的框架推理镜像,包括TensorFlowPyTorchXGBoost等,支持用户快速创建推理服务。用户可以通过pai.image.list_imagespai.image.retrieve方法传递image_scope=ImageScope.INFERENCE信息从而获取到相应的推理镜像,然后使用镜像部署的方式部署模型。

from pai.image import retrieve, ImageScope, list_images

# 获取PAI提供的所有PyTorch推理镜像。
for image_info in list_images(framework_name="PyTorch", image_scope=ImageScope.INFERENCE):
  	print(image_info)


# 获取PAI提供的PyTorch 1.12版本的CPU推理镜像。
retrieve(framework_name="PyTorch", framework_version="1.12", image_scope=ImageScope.INFERENCE)

# 获取PAI提供的PyTorch 1.12版本的GPU推理镜像。
retrieve(framework_name="PyTorch", framework_version="1.12", accelerator_type="GPU", image_scope=ImageScope.INFERENCE)

# 获取PAI提供的PyTorch最新版本的GPU推理镜像。
retrieve(framework_name="PyTorch", framework_version="latest", accelerator_type="GPU", image_scope=ImageScope.INFERENCE)

部署在线推理服务

用户使用pai.model.InferenceSpec和模型数据地址model_data构建一个模型对象pai.model.Model,然后通过调用.deploy方法部署模型。model_data可以是一个OSS URI,也可以是本地路径,对于本地路径的模型,相应的模型文件会被上传到OSS Bucket上,然后准备到推理服务中,供对应的服务程序加载使用。

当调用.deploy方法部署模型时,用户需要指定服务所需的资源配置、服务实例个数、服务名称等服务相关参数。更多高阶参数说明,请参见服务模型所有相关参数说明

from pai.model import Model

model = Model(
    # model_data模型所在的路径,可以是OSS URI,或是本地路径。对于本地路径的模型,默认会被上传到OSS Bucket上。
    model_data="oss://<YourBucket>/path-to-model-data",
    inference_spec=inference_spec,
)

# 部署到EAS。
predictor = m.deploy(
    # 推理服务的名称。
    service_name="example_xgb_service",
    # 服务使用的机器类型。
    instance_type="ecs.c6.xlarge",
    # 机器实例/服务的个数。
    instance_count=2,
    # 用户的专有资源组,可选。默认使用公共资源组。
    # resource_id="<YOUR_EAS_RESOURCE_GROUP_ID>",
    options={
        "metadata.rpc.batching": True,
        "metadata.rpc.keepalive": 50000,
        "metadata.rpc.max_batch_size": 16,
        "warm_up_data_path": "oss://<YourOssBucketName>/path-to-warmup-data",
    },
)

当用户需要根据服务使用的资源数量(例如CPU、Memory)配置服务时,可以通过resource_config参数配置每个服务实例申请的资源,示例如下:

from pai.model import ResourceConfig

predictor = m.deploy(
    service_name="dedicated_rg_service",
    # 指定单个服务实例使用的CPU和Memory资源。
    # 当前示例中,每一个服务使用2个核的CPU,以及4000 MB的内存。
    resource_config=ResourceConfig(
        cpu=2,
        memory=4000,
    ),
)

调用推理服务

pai.model.Model.deploy方法通过调用EAS的API创建一个新的推理服务,并返回一个pai.predictor.Predictor对象,指向新创建的推理服务。Predictor对象提供了predictraw_predict方法,支持向推理服务发送预测请求。

说明

pai.predictor.Predictor.raw_predict的输入和输出不会使用Serializer进行处理.

from pai.predictor import Predictor, EndpointType

# 创建一个新的推理服务。
predictor = model.deploy(
    instance_type="ecs.c6.xlarge",
    service_name="example_xgb_service",
)

# 使用已有的推理服务。
predictor = Predictor(
    service_name="example_xgb_service",
    # 默认使用INTERNET公网网络访问,用户可以配置使用VPC的网络(需要客户端代码运行在VPC环境下)。
    # endpoint_type=EndpointType.INTRANET
)

# .predict向对应服务发送数据请求,拿到相应结果。输入数据和响应结果会经过serializer处理。
res = predictor.predict(data_in_nested_list)


# .raw_predict支持更为灵活的方式发送请求给到推理服务。
response: RawResponse = predictor.raw_predict(
  	# 如果输入数据是bytes,或是file-like object,请求数据直接在HTTP请求体内传递。
  	# 否则,则会经过一次JSON序列化,然后放在HTTP请求体内传递。
  	data=data_in_nested_list
  	# path="predict"            # 自定义HTTP请求路径,默认将请求发送到"/"路径。
  	# headers=dict(),						# 自定义请求Header。
  	# method="POST"							# 自定义请求的HTTP Method。
  	# timeout=30,								# 自定义请求的timeout。
)

# 获取返回的body, headers。
print(response.content, response.headers)
# 将返回结果JSON反序列化为Python对象。
print(response.json())

    
# 停止推理服务。
predictor.stop_service()
# 开始推理服务。
predictor.start_service()
# 删除推理服务。
predictor.delete_service()

使用Serializer处理推理服务的输入和输出

当使用SDK的Predictor对象请求推理服务,需要将输入的Python的数据结构序列化,将其转换为服务能够支持的数据格式进行传输。服务响应返回的数据也同样需要做一次反序列化转为可读,或是可以操作的Python对象。Predictor使用serializer参数的处理预测数据序列化,以及预测响应结果的反序列化:

  • 当调用predict(data=<PredictionData>)方法时,data参数会通过serializer.serialize方法序列化请求数据,获得的bytes类型的结果,然后通过HTTP请求体传递给预测服务。

  • 当推理服务返回HTTP响应结果之后,Predictor对象通过serializer.deserialize方法反序列化HTTP响应的结果,作为predict方法的返回。

SDK提供了一些预置的Serializer,支持常见数据的序列化处理,以及PAI内置的深度学习Processor的输入输出数据处理。

  • JsonSerializer

JsonSerializer对象支持JSON数据的序列化和反序列化。用户通过predict方法传递的data,可以是numpy.ndarray,或是一个ListJsonSerializer.serialize负责将对应的数组序列化为JSON字符串,JsonSerializer.deserialize则负责将返回的JSON字符串反序列化为一个Python对象。

PAI提供的预置的XGBoost Processor、PMML Processor等使用JSON格式接收数据和响应结果,Predictor默认使用JsonSerializer处理这些processor创建的服务的输入输出数据。

from pai.serializers import JsonSerializer

# 在“.deploy”方法指定返回的predictor使用的serializer。
p = Model(
    inference_spec=InferenceSpec(processor="xgboost"),
    model_data="oss://<YourOssBucket>/path-to-xgboost-model"
).deploy(
    instance_type="ecs.c6.xlarge",
    # 可选:使用XGBoost processor的service默认使用JsonSerializer。
    serializer=JsonSerializer()
)

# 或是直接创建Predictor时指定对应的serializer。
p = Predictor(
    service_name="example_xgb_service"
    serializer=JsonSerializer(),
)

# 预测的返回结果也是一个list。
res = p.predict([[2,3,4], [4,5,6]])

  • TensorFlowSerializer

PAI提供了预置的Tensorflow Processor,支持用户将TensorFlow的SavedModel直接部署到PAI创建推理服务。对应的服务的输入输出消息格式是Protocol Buffers,具体文件格式请参见tf_predict.proto

SDK提供了预置的 TensorFlowSerializer,支持用户通过传递numpy.ndarray数据类型的参数发送预测请求,Serializer负责使用对应的numpy.ndarray生成对应的Protocol Buffers消息,并将接收的Protocol Buffers消息反序列化为numpy.ndarray数据类型。

# 创建一个TensorFlow processor服务。
tf_predictor = Model(
    inference_spec=InferenceSpec(processor="tensorflow_cpu_2.7"),
    model_data="oss://<YourOssBucket>/path-to-tensorflow-saved-model"
).deploy(
    instance_type="ecs.c6.xlarge",
    # 可选:使用TensorFlow processor的service默认使用TensorFlowSerializer。
    # serializer=TensorFlowSerializer(),
)

# 使用TensorFlow processor启动的服务,支持用户通过API获取模型的服务签名。
print(tf_predictor.inspect_signature_def())

# TensorFlow processor的输入要求一个Dict,Key是模型输入签名的名称,Value是具体的输入数据。
tf_result = tf_predictor.predict(data={
    "flatten_input": numpy.zeros(28*28*2).reshape((-1, 28, 28))
})

assert result["dense_1"].shape == (2, 10)

  • PyTorchSerializer

PAI提供了预置的PyTorch Processor,支持用户使用TorchScript 格式的模型部署推理服务。使用PyTorch Processor启动的推理服务的输入输出使用Protocol Buffers格式传递数据,具体文件格式请参见pytorch_predict_proto

SDK提供了预置的PyTorchSerializer,支持用户使用numpy.ndarray发送请求,并将预测结果转换为numpy.ndarray,由PyTorchSerializer负责Protocol Buffers消息和numpy.ndarray的转换。

# 创建一个使用PyTorch processor服务。
torch_predictor = Model(
    inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
    model_data="oss://<YourOssBucket>/path-to-torch_script-model"
).deploy(
    instance_type="ecs.c6.xlarge",
    # 可选:使用PyTorch processor的service默认使用PyTorchSerializer。
    # serializer=PyTorchSerializer(),
)

# 1. 用户需要注意将对应的输入数据 reshape 成模型支持的形状。
# 2. 如果有多个输入数据,则需要使用List/Tuple传递,列表中的每一项是numpy.ndarray。
torch_result = torch_predictor.predict(data=numpy.zeros(28 * 28 * 2).reshape((-1, 28, 28)))
assert torch_result.shape == (2, 10)

  • 自定义Serializer

用户可以根据推理服务支持的数据格式自定义Serializer类:自定义Serializer类需继承pai.serializers.SerializerBase,实现serializedeserialize方法。

以下示例是一个自定义的NumpySerializer,当predict被调用时,整体的链路如下:

  1. 客户端: 用户传递numpy.ndarray, 或是pandas.DataFrame,作为predict的输入,调用 NumpySerializer.serializer序列化为npy format,发送给到服务端。

  2. 服务端: 推理服务接收的npy格式数据,反序列化数据,获得推理结果,然后将输出的结果,序列化为npy格式返回。

  3. 客户端: 接收到npy格式返回,通过NumpySerializer.deserialize反序列化为numpy.ndarray

import pandas as pd
import numpy as np
import io
from typing import Union

class NumpySerializer(SerializerBase):

    def serialize(self, data: Union[np.ndarray, pd.DataFrame, bytes]) -> bytes:
        """Serialize input python object to npy format"""
        if isinstance(data, bytes):
            return data
        elif isinstance(data, str):
            return data.encode()
        elif isinstance(data, pd.DataFrame):
            data = data.to_numpy()

        res = io.BytesIO()
        np.save(res, data)
        res.seek(0)
        return res.read()

    def deserialize(self, data: bytes) -> np.ndarray:
        """Deserialize prediction response to numpy.ndarray"""
        f = io.BytesIO(data)
        return np.load(f)


# 创建一个使用输入输出使用npy格式的推理服务。
predictor = Model(
    inference_spec=infer_spec,
    model_data="oss://<YourOssBucket>/path-to-model"
).deploy(
    instance_type="ecs.c6.xlarge",

    # 使用自定义的serializer。
    serializer=NumpySerializer(),
)

res: predictor.predict(data=input_data)

assert isinstance(input_data, numpy.ndarray)
assert isinstance(res, numpy.ndarray)

本地部署和测试推理服务

对于自定义镜像部署,SDK提供了本地执行模式(当前不支持使用 Processor 部署的服务),通过在model.deploy中,传递instance_type="local"参数,指定在本地运行推理服务。 SDK通过docker在本地拉起一个模型服务,依赖的模型会从OSS下载到本地,然后挂载到本地运行的容器镜像中。

from pai.predictor import LocalPredictor

p: LocalPredictor = model.deploy(
    # 指定运行在本地。
    instance_type="local",
    serializer=JsonSerializer()
)

p.predict(data)

# 删除对应的docker容器。
p.delete_service()

  • 本页导读 (1)
文档反馈