对于推理耗时相对较长的使用场景,同步等待结果会存在HTTP长连接断开、客户端超时、负载均衡及实例异常等问题。 针对以上问题,PAI提供了队列服务和异步推理功能,可以通过请求分发、订阅推送或定期查询结果来实现推理。本文为您介绍如何使用队列服务及异步推理。
背景信息
功能介绍
- 异步推理
对于实时性要求比较高的在线推理场景,通常使用同步推理,即客户端发送一个请求,同步等待结果返回。
对于推理耗时比较长的特殊场景,同步等待结果会带来HTTP长连接断开、客户端超时等诸多问题,通常使用异步推理。即请求发送至服务端,客户端不再同步等待结果,而是选择定期去查询结果,或通过订阅的方式在请求计算完成后等待服务端的结果推送。
- 队列服务
对于准实时推理场景,比如短视频、视频流或语音流的处理、计算复杂度很高的图像处理等场景,不需要实时返回结果,但需要在指定时间内获取推理结果,该场景存在以下几类问题:
- 负载均衡算法不能选择round robin算法,需要根据各个实例的实际负载情况进行请求的分发。
- 实例异常,该实例上未计算完成的任务需要重新分配给其他实例进行计算。
实现原理

- 每个队列服务会默认创建两个队列,即输入队列(默认)和输出队列(sink队列)。当队列服务和推理服务搭配使用时,推理服务中内置的服务引擎,作为同分组队列服务的客户端,会自动监听该队列服务中输入队列的数据并进行订阅处理,并将结果自动写入到输出队列中。因此,您只需要在服务部署时指定服务分组,并在该分组中创建一个队列服务,则该分组服务即可同时通过同步和异步两种方式进行访问。
- 创建一个高可用的队列服务,用于接收客户端发送的请求。客户端实例根据自己所能承受的并发度来订阅指定个数的请求,队列服务会保证每个实例上处理的请求不会超过客户端实例订阅的窗口大小,通过该方式来保证不会存在实例过载,最终将订阅或查询的数据返回给客户端。
说明 比如每个实例只能处理5路语音流,则从队列中订阅消息时,将window size配置为5。当实例处理完一路语音流后将结果commit,队列服务会为实例重新推送一路新的语音流,保证实例上处理的语音流最多不超过5路。
- 队列服务通过检测客户端的连接状态,对客户端进行健康检查,如果因客户端异常导致连接断开,队列服务会将该客户端实例标记为异常,已经分发给该实例进行处理的请求会重新推送给其他正常实例进行处理,以此来保证在异常情况下请求数据不会丢失。
创建队列服务和推理服务
通过控制台方式创建服务
- 创建推理服务和服务分组。
- 创建队列服务。
通过客户端方式创建服务
发送同步或异步推理请求
队列服务通过HTTP、Websocket接口来发送请求、接收响应数据。队列服务创建完成后,会自动生成输入队列和输出队列(sink队列)两类地址,以HTTP接口为例,说明如下:
您可以在控制台页面,参照下图操作指引,查看队列服务的输入地址和输出地址。
地址类型 | 地址格式 | 示例 |
---|---|---|
输入队列地址 | {domain}/api/predict/{group_name}.{queue_service_name} |
xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice |
输出队列地址 | {domain}/api/predict/{group_name}.{queue_service_name}/sink |
xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice/sink |

使用
curl
命令向输入队列服务发送一条同步请求或异步推理请求,具体代码示例如下。$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'
系统输出如下结果。> POST /api/predict/test_group.qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
6944957106013339648
其中:- Response Header中返回的X-Eas-Queueservice-Request-Id,为该请求对应的Request ID:4033eb55-e783-4922-9777-68d6a1383c76,您可以通过该Request ID来查询数据。
- Response Body中返回的是当前请求在队列中的Index:6944957106013339648,您可以通过Index在当前队列中查询数据。
查看队列服务详情
如果您在向队列服务发送请求时,增加
_attrs_=true
参数,返回结果中会显示当前队列的详情信息。具体代码示例如下。$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice?_attrs_=true
系统输出如下结果。> GET /api/predict/test_group.qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"meta.backend":"redis","meta.header.groupIdentifyHeader":"X-EAS-QueueService-Redis-Gid","meta.header.userIdentifyHeader":"X-EAS-QueueService-Redis-Uid","meta.maxPayloadBytes":"57600","stream.approxMaxLength":"8192","stream.firstEntry":"6946744471463657472","stream.lastEntry":"6946744570403094528","stream.length":"2"}
上述结果中返回JSON格式的详情信息,其中关键字段说明如下:字段名 | 描述 |
---|---|
meta.maxPayloadBytes | 队列中允许的每个数据项的大小上限,单位为Byte。 |
stream.approxMaxLength | 队列中能存储的数据项的数量上限。 |
stream.firstEntry | 队列中第一个数据项的index。 |
stream.lastEntry | 队列中最后一个数据项的index。 |
stream.length | 队列中当前存储的数据项的数量。 |
您也可以在控制台上,参照下图操作指引,查询队列服务详情,包括队列中当前存储的数据项数量、允许的每个数据项的大小上限和能够存储的数据项的数量上限等。

查询同步或异步推理结果
- 查询同步结果
当只使用一个队列服务时,您可以通过Index或Request ID从输入队列中查询数据,具体代码示例如下。
系统输出如下结果。# 通过index查询数据。 $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice?_index_=6946744471463657472 # 通过request id查询数据。 $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896
您可以配置以下参数来查询推理结果,具体参数说明如下:> GET /api/predict/test_group.qservice?_index_=694674447146365****&_auto_delete_=false HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < [{}]
参数 类型 核心参数说明 _index_ INT 要查询数据的起始index。默认为0,表示从队列的初始数据项开始查询。 _length_ INT 要查询的数据项的条数。默认为1,表示仅查询一条数据项。 _auto_delete_ BOOL 是否从队列中删除已查询的数据。默认为TRUE,表示查询完成后,将查询出的数据项自动从队列中删除。 _timeout_ STRING 超时时间。默认为0,表示查询时队列中无符合要求的数据则立即返回,否则等待指定时间,在超时时间内如果队列中出现符合要求的数据,则将数据返回。 requestId STRING requestId为特殊的tag,表示通过该tag来查询数据。 说明 当使用异步服务功能时,请求从输入队列返回,由EAS服务框架读取输出数据进行处理后将结果自动写入到输出队列中,服务框架会通过requestId这个tag将输入数据与输出数据进行关联,通过输入数据的requestId即可在输出队列服务中查询结果数据。 - 查询异步推理结果
当队列服务有与之搭配的推理服务时,推理服务会自动从输入队列中读取请求数据,进行推理计算后将推理结果写出到输出队列(sink)中。使用以下代码从输出队列服务中根据Request ID(0337f7a1-a6f6-49a6-8ad7-ff2fd12b****)查询数据。
系统输出如下结果。$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/test_group.qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d
> GET /api/predict/test_group.qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 53 < Content-Type: text/plain; charset=utf-8 [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]
队列服务订阅推送
在异步推理场景中,您可以通过订阅的方式来获取推理结果。推理服务中内置的服务引擎以订阅的方式来获取输入队列中数据的推送,并将数据写入到输出队列,客户端通过订阅的方式来获取推理结果。队列服务根据当前服务实例配置的并发数(worker_threads)来控制订阅的窗口(Window)大小,当队列中被写入新数据时,队列服务会自动将数据推送给正在订阅的客户端。
该功能在SDK中基于WebSocket协议封装了客户端实现QueueClient,通过长连接的方式建立推送链路。下面以一个典型的视频、语言流处理场景为例,介绍如何通过Python
SDK中的QueueClient来订阅队列中的数据。
说明 推理服务不是必须的,您也可以通过SDK在自定义的服务中订阅队列服务的输入队列,输出结果也可以选择写入到第三方的消息队列中或其它目标存储中(比如输出图片到OSS)。
- 安装EAS Python SDK。
pip install eas_prediction --user
- 通过QueueClient的
put()
方法向输入队列中发送数据,并使用watch()
方法从结果队列中订阅数据,实际使用场景中发送数据和订阅数据可以由不同的线程处理,本示例中发送数据和订阅数据在同一线程中完成,先put数据,后watch结果。# 创建输入队列对象,用于写入输入数据。 input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'test_group.qservice') input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') input_queue.init() # 创建输出队列对象,用于订阅读取输出结果数据。 sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'test_group.qservice/sink') sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') sink_queue.init() # 各输入队列中推送10个数据项。 for x in range(10): index, request_id = input_queue.put('[{}]') print(index, request_id) # 查看输入队列的详情。 attrs = input_queue.attributes() print(attrs) # 从输出队列中watch数据,窗口为5。 i = 0 watcher = sink_queue.watch(0, 5, auto_commit=False) for x in watcher.run(): print(x.data.decode('utf-8')) # 每次收到一个请求数据后处理完成后手动commit。 sink_queue.commit(x.index) i += 1 if i == 10: break # 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再次运行时会报错。 watcher.close()