在复杂的模型推理场景中,例如AIGC、视频处理等场景。推理耗时较长,存在长连接超时导致请求失败或实例负载不均衡等问题,不适用于同步推理的场景。针对以上问题,PAI提供了异步推理服务,支持通过订阅或轮询的方式来获取推理结果。本文为您介绍如何使用异步推理服务。

实现原理

异步推理服务会在服务内部集成一个服务维度的消息队列,消息队列共有两个队列,即输入(input)队列和输出(sink)队列。服务请求会先发送到输入队列中,异步推理服务实例中的EAS服务框架会自动订阅队列服务中的请求数据,调用异步推理服务中的HTTP接口对收到的请求数据进行推理,并将响应结果写入到输出队列中。

当输出队列满时,即无法向输出队列中写入数据时,服务框架也会停止从输入队列中接收数据,避免无法将推理结果输出到输出队列。

如果您不需要输出队列,例如将推理结果直接输出到OSS或者您自己的消息中间件中,则可以在同步的HTTP推理接口中返回空,此时输出队列会被忽略。

创建异步推理服务

EAS的异步推理服务可以实现同步推理逻辑到异步推理的转换,创建异步推理服务的操作步骤如下。

  1. 准备服务的配置文件service.json。
    • 部署方式为模型部署。
      {
        "name": "pmmlasync",
        "processor": "pmml",
        "model_path": "http://example.oss-cn-shanghai.aliyuncs.com/models/lr.pmml",
        "metadata": {
          "type": "Async",
          "cpu": 4,
          "instance": 1
        }
      }
      其中关键参数说明如下。

      type:配置该参数为Async,即可创建异步推理服务。

    • 部署方式为镜像部署。
      {
        "name": "imageasync",
        "containers": [
          {
            "image": "registry-vpc.cn-shanghai.aliyuncs.com/eas/eas-container-deploy-test:202010091755",
            "command": "/data/eas/ENV/bin/python /data/eas/app.py"
            "port": 8000,
          }
        ],
        "metadata": {
          "type": "Async",
          "rpc.proxy_path": "/data/model",
          "rpc.worker_threads": 4,
          "instance": 1,
        }
      }
      其中关键参数说明如下。
      • type:配置该参数为Async,即可创建异步推理服务。
      • rpc.proxy_path:对于使用自定义镜像部署的服务,通常会实现多个HTTP path。而在异步推理服务中,目前只能调用其中一个HTTP path,需要您通过该参数来指定该path的值,
      • rpc.worker_threads:为异步推理服务中EAS服务框架的线程数,该参数与订阅队列数据的窗口大小一致。线程数设置为4,即一次最多只能从队列中订阅4条数据,在这4条数据处理完成前,队列不会给异步推理服务推送新数据。

        例如:某视频流处理服务,单实例一次只能处理2条视频流,则该参数可以设置为2,队列最多将2个视频流的地址推送给异步推理服务,在异步推理服务返回结果前,不会在推送新的视频流地址。如果异步推理服务完成了其中一个视频流的处理并返回结果,则队列服务会继续再推送一个新的视频流地址给该服务实例,保证一个服务实例最多同时处理不超过2路视频流。

    配置文件中的其他参数解释,详情请参见 服务模型所有相关参数说明

    创建异步推理服务时,队列服务会随异步推理服务自动创建,并集成到异步推理服务内。队列服务实例数默认为1,且会随推理服务实例数量动态伸缩,最大为2。单个实例占用资源默认为1核4 GB。如果队列服务默认配置不能满足你的需求,您可以参考队列服务相关配置,进行配置。

  2. 创建服务。
    您可以登录eascmd客户端后使用create命令创建异步推理服务,如何登录eascmd客户端,请参见 下载并认证客户端,使用示例如下。
    eascmd create service.json

发送请求并查询推理结果

异步推理服务会自动从输入队列中读取请求数据,进行推理计算后将推理结果写出到输出队列(sink)中。以下内容以前述章节创建的PMML推理模型(pmmlasync)为例,为您演示如何发送服务请求进行异步推理,并异步查询推理结果。

  1. 向输入队列服务中发送请求。
    $ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/pmmlasync -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZm****' -d '[{}]'
    
    > POST /api/predict/pmmlasync HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZm****
    >
    < HTTP/1.1 200 OK
    < Content-Length: 19
    < Content-Type: text/plain; charset=utf-8
    < X-Eas-Queueservice-Request-Id: 0337f7a1-a6f6-49a6-8ad7-ff2fd12b****
    <
    * Connection #0 to host 182848887922****.cn-shanghai.pai-eas.aliyuncs.com left intact
    6946752088097423360
  2. 从输出队列中,查询推理结果。
    您可以通过以下两种方式查询推理结果。
    • 通过Request Id主动查询推理结果。
      $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZm****' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/pmmlasync/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b****
      
      > GET /api/predict/test_group.qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1
      > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
      > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZm****
      >
      < HTTP/1.1 200 OK
      < Content-Length: 53
      < Content-Type: text/plain; charset=utf-8
      
      [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]
      其中:requestId为步骤1中查询到的Request-Id。
    • 通过订阅的方式来获取输出队列中的结果数据。
      通过订阅的方式与输出队列建立websocket长连接。当输出队列中有数据写入时,输出队列会通过websocket长连接,将结果推送给客户端。您可以参考以下Python SDK示例,完成输出数据的订阅。
      from eas_prediction import QueueClient
      
      if __name__ == '__main__':
          # 创建输出队列对象,用于订阅读取输出结果数据。
          sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pmmlasync/sink')
          sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZm****')
          sink_queue.init()
      
          # 从输出队列中watch数据,窗口为5。
          i = 0
          watcher = sink_queue.watch(0, 5, auto_commit=False)
          for x in watcher.run():
              print(x.data.decode('utf-8'))
      
              # 每次收到一个请求数据后处理完成后手动commit。
              sink_queue.commit(x.index)
              i += 1
              if i == 10:
                  break
      
          # 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再运行时会报错。
          watcher.close()

水平自动扩缩容

您可以登录eascmd客户端后,使用以下命令开启异步推理服务的弹性伸缩功能。如何登录eascmd客户端,详情请参见 下载并认证客户端
  • 命令格式
    eascmd autoscale <service_name> -Dmin=[attr_value] -Dmax=[attr_value] -Dstrategies.queue[avgbacklog]=[attr_value]
    其中:
    • queue[avgbacklog]:队列服务自动伸缩独立的指标,表示队列中堆积的请求长度。
    • <service_name>:异步推理服务名称。
  • 使用示例
    eascmd autoscale pmmlasync -Dmin=0 -Dmax=10 -Dstrategies.queue[avgbacklog]=10
    其中:
    • queue[avgbacklog]=10:表示异步推理服务每个实例最多能处理10个请求。
    • max=10:表示异步推理服务实例个数最多能扩容到10个。
    • min=0:表示异步推理服务实例个数最多能缩容到0个。
    如果异步推理服务实例个数为3,当队列中堆积的请求数超过30(即30/3大于10)时,开始扩容,服务实例最多扩容到10个。当队列中堆积的请求数小于且等于30个时,开始缩容。当队列中的请求长度为空时,支持将服务实例数完全缩容到0,此时队列会持续运行。当请求数据重新被发送到队列中时,则会触发扩容。
您可以使用以下命令,配置扩容和缩容的等待时间。
  • 命令格式
    eascmd autoscale <service_name> -Dbehavior.<attr_name>.stabilizationWindowSeconds=<attr_value>
    其中:
    • <service_name>:异步推理服务名称。
    • <attr_name>:取值为scaleDown(表示缩容)或scaleUp(表示扩容)。
    • <attr_value>:参数值,表示配置的等待时间,INT类型,单位为秒。

      缩容时,该参数值默认为300秒。该值不易配置过小,避免因指标波动较大导致频繁触发缩容。

      扩容时,该参数值默认为0秒。通常在指标超过阈值时,尽可能快的进行扩容,避免因资源不足影响业务,可根据实际情况进行配置。

  • 使用示例
    • 配置缩容等待时间
      eascmd autoscale pmmlasync -Dbehavior.scaleDown.stabilizationWindowSeconds=100
      表示在指标值达到缩容阈值时,等待100秒进行缩容。
    • 配置扩容等待时间
      eascmd autoscale pmmlasync -Dbehavior.scaleUp.stabilizationWindowSeconds=100
      表示在指标值达到扩容阈值时,等待100秒进行扩容。
您也可以通过配置文件,同时配置扩容和缩容的等待时间。
  • 命令格式
    eascmd autoscale <service_name> -s <scale.json>
    其中:
    • <service_name>:异步推理服务名称。
    • <scale.json>:配置文件。文件内容示例如下。
      {
        "behavior": {
          "scaleUp": {
            "stabilizationWindowSeconds": 20
          },
          "scaleDown": {
            "stabilizationWindowSeconds": 150
          }
        }
      }
  • 使用示例
    eascmd autoscale pmmlasync -s scale.json
    表示在指标值达到扩容阈值时,等待20秒进行扩容;在指标达到缩容阈值时,等待150秒进行缩容。

队列服务相关配置

队列服务会随异步推理服务自动创建,您可以在配置文件service.json中设置队列服务相关参数。

参数说明

以下参数为非必选参数,如果未设置,则取默认值。
参数 描述
queue.min_replica 队列服务的最小实例数量,默认为1。最大支持设置为2,表示固定2个实例,用于高可用主备模式。
queue.memory 队列服务每个实例的内存大小,单位为MB,默认为4000 MB。
queue.cpu 队列服务每个实例的CPU用量,默认为1核。
queue.source.auto_evict 输入或输出队列是否开启自动驱逐,取值如下:
  • false(默认值):当队列满(即达到最大队列长度)时,如果继续向队列发送数据,会发送失败,并返回状态码429(StatusTooManyRequests)。
  • true:当队列满(即达到最大队列长度)时,继续向队列发送数据,会将队列中最老的数据删除,不会返回429。
queue.sink.auto_evict
queue.source.max_length 配置输入或输出队列的最大长度,单位为KB。
重要 该参数不允许与 max_payload_size同时配置。
  • 配置固定长度
    队列服务的单条数据最大长度会根据配置的内存大小进行调整,计算公式为:
    单条数据最大长度=(输入或输出)队列内存×0.9/队列最大长度
  • 不配置

    系统根据内存自动计算。

queue.sink.max_length
queue.source.max_payload_size 配置输入或输出队列的单条数据最大长度,单位为KB。
重要 该参数不允许与 max_length同时配置。
  • 配置固定长度
    队列服务的最大长度会根据内存大小进行调整,计算公式为:
    队列最大长度=(输入或输出)队列内存×0.9/单条数据最大长度
  • 不配置

    默认为8KB。

queue.sink.max_payload_size
queue.sink.memory_ratio 配置输出队列内存占比,DOUBLE类型,取值范围为[0,1]。
如果配置输出队列内存占比为r,则输入队列的内存占比为(1-r)。计算公式如下:
输出队列内存=服务实例内存×r
输入队列内存=服务实例内存×(1-r)
                                                        
当r=1时,输入队列会被关闭(不可读写,内存占用为0);当r=0时,输出队列会被关闭(不可读写,内存占用为0)。

配置示例

{
  "name": "pmmlasync",
  "queue.min_replica": 2,
  "queue.source.auto_evict": true,
  "queue.source.max_length": 1024,
  "queue.sink.memory_ratio": 0,
  "processor": "pmml",
  "model_path": "http://eas-data.oss-cn-shanghai.aliyuncs.com/models/lr.pmml",
  "metadata": {
    "type": "Async",
    "cpu": 4,
    "instance": 1
  }
}