访问队列服务

本文为您介绍如何使用HTTP API、SDKeascmd访问队列服务。

通过API访问队列服务

异步推理服务部署完成后,会自动生成输入队列和输出队列(sink队列)两类地址,以HTTP接口为例,说明如下:

地址类型

地址格式

示例

输入队列地址

{domain}/api/predict/{service_name}

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}

输出队列地址

{domain}/api/predict/{service_name}/sink

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}/sink

您可以在PAI-EAS模型在线服务页面,单击异步推理服务的服务方式列下的调用信息,查看输入队列地址、输出队列地址和Token。

image

image

向队列服务发送数据

使用curl命令向输入队列发送一条同步请求或异步推理请求,具体代码示例如下。

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

示例结果如下:

> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033

其中:

  • Response Header中返回的X-Eas-Queueservice-Request-Id,为该请求对应的Request ID:4e034bnvb-e783-4272-9333-68x6a1v8dc6x,您可以通过该Request ID来查询数据。

  • Response Body中返回的是当前请求在队列中的Index:1033,您可以通过Index在当前队列中查询数据。

发送优先数据

在队列服务中,普通数据按照FIFO顺序进行推送,但是在很多场景中,部分数据需要被优先推送和处理。队列服务支持数据优先推送。您可以通过增加query参数_priority_=1,向队列服务推送优先数据。

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

示例结果如下:

> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034

优先数据一旦被写入队列,将被优先推送给订阅者,从而进行优先处理。

查看队列服务详情

如果您在向队列服务发送请求时,增加_attrs_=true参数,返回结果中会显示当前队列的详情信息。具体代码示例如下。

$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true

示例结果如下:

> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}

上述结果中返回JSON格式的详情信息,其中关键字段说明如下:

字段名

描述

stream.maxPayloadBytes

队列中允许的每个数据项的大小上限,单位为Byte。

stream.approxMaxLength

队列中能存储的数据项的数量上限。

stream.firstEntry

队列中第一个数据项的index。

stream.lastEntry

队列中最后一个数据项的index。

stream.length

队列中当前存储的数据项的数量。

meta.state

当前队列的状态。

您也可以在PAI-EAS模型在线服务页面,单击异步推理服务的名称进入服务详情页面。在该页面中,查询队列信息,包括队列中当前存储数据项数量、数据项大小上限、存储数据项数量上限和订阅实例数等。image

查询数据

  • 根据条件查询结果

    当只使用一个队列服务时,您可以通过IndexRequest ID从输入队列中查询数据,具体代码示例如下。

    # 通过index查询数据。
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022
    # 通过request id查询数据。
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896

    示例结果如下:

    > GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    [{}]

    您可以配置以下参数来查询推理结果,具体参数说明如下:

    参数

    类型

    核心参数说明

    _index_

    INT

    要查询数据的起始index。默认为0,表示从队列的初始数据项开始查询,该index越接近被查询数据,查询的效率越高。

    _length_

    INT

    要查询的数据项的条数。默认为1,表示仅查询一条数据项。

    _auto_delete_

    BOOL

    是否从队列中删除已查询的数据。默认为TRUE,表示查询完成后,将查询出的数据项自动从队列中删除。

    _timeout_

    STRING

    超时时间。默认为0,表示查询时队列中无符合要求的数据则立即返回204状态码,否则等待指定时间,在超时时间内如果队列中出现符合要求的数据,则将数据返回。示例值:1s(1秒), 1m(1分钟)。

    requestId

    STRING

    requestId为内建的tag,表示通过该tag来查询数据。

    说明

    当使用异步推理服务功能时,请求从输入队列返回,由EAS服务框架读取输出数据进行处理后将结果自动写入到输出队列中,服务框架会通过requestId这个tag将输入数据与输出数据进行关联,通过输入数据的requestId即可在输出队列中查询结果数据。

  • 查询异步推理结果

    当队列服务有与之搭配的推理服务时,推理服务会自动从输入队列中读取请求数据,进行推理计算后将推理结果写出到输出队列(sink)中。使用以下代码根据Request ID(0337f7a1-a6f6-49a6-8ad7-ff2fd12b****)从输出队列中查询数据。

    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d

    示例结果如下:

    > GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 53
    < Content-Type: text/plain; charset=utf-8
    <
    [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]

清理数据

当您的队列中不再需要某些数据时,可以通过API对数据进行清理。数据清理的方式主要有两种,分别是单条数据删除(delete)和数据截止删除(truncate)。

  • 删除单条数据

    # 通过index删除数据。
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022

    示例结果如下:

    > GET /api/predict/qservice?_index_=1022 HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    您可以配置以下参数来查询推理结果,具体参数说明如下:

    参数

    类型

    核心参数说明

    _index_

    INT

    要删除的数据index。

  • 批量数据删除

    # 通过index删除数据。
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true

    示例结果如下:

    > GET /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    您可以配置以下参数来查询推理结果,具体参数说明如下:

    参数

    类型

    核心参数说明

    _index_

    INT

    要删除的数据截止index,低于(不包含)这个index的数据将被删除。

    _trunc_

    BOOL

    在批量删除时必须为true,否则将转换为单条删除。

队列服务订阅推送

在异步推理场景中,除了上述的阻塞查询,您还可以通过订阅的方式来获取推理结果。队列服务提供了订阅(watch)接口,客户端可以通过该接口来获取推理结果。队列服务根据当前推理服务实例配置的并发数(worker_threads)来控制订阅的窗口(Window)大小,当队列中被写入新数据时,队列服务会自动将数据推送给正在订阅的客户端。

该功能在SDK中基于WebSocket协议封装了客户端实现QueueClient,通过长连接的方式建立推送链路。下面以一个典型的视频、语音流处理场景为例,介绍如何通过Python SDK中的QueueClient来订阅队列中的数据。

说明

推理服务不是必须的,您也可以通过SDK在自定义的服务中订阅队列服务的输入队列,输出结果也可以选择写入到第三方的消息队列中或其它目标存储中(比如输出图片到OSS)。

  1. 安装EAS Python SDK。

    pip install eas_prediction --user
  2. 通过QueueClientput()方法向输入队列中发送数据,并使用watch()方法从输出队列中订阅数据。在实际使用场景中,发送数据和订阅数据可以由不同的线程处理,本示例中发送数据和订阅数据在同一线程中完成,先put数据,后watch结果。

    #!/usr/bin/env python
    from eas_prediction import QueueClient
    # 创建输入队列对象,用于写入输入数据。
    input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice')
    # 如果需要自定义usergroup,可以分别通过uidgid进行指定,示例如下:
    # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id')
    input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    input_queue.init()
    
    # 创建输出队列对象,用于订阅读取输出结果数据。
    sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink')
    sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    sink_queue.init()
    
    # 各输入队列中推送10个数据项。
    for x in range(10):
        index, request_id = input_queue.put('[{}]')
        print(index, request_id)
    
        # 查看输入队列的详情。
        attrs = input_queue.attributes()
        print(attrs)
    
    # 从输出队列中watch数据,窗口为5。
    i = 0
    watcher = sink_queue.watch(0, 5, auto_commit=False)
    for x in watcher.run():
        print(x.data.decode('utf-8'))
    
        # 每次收到一个请求数据后处理完成后手动commit。
        sink_queue.commit(x.index)
        i += 1
        if i == 10:
            break
    # 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再次运行时会报错。
    watcher.close()
    

通过eascmd访问队列服务

eascmd已经封装好了完整的队列服务API,您可以使用eascmd stream子命令快速操作、调试队列服务。

下载eascmd

请确保eascmd版本大于2.6.0,您可以参考文档下载并认证客户端来下载、更新、配置eascmd命令行客户端。

配置eascmd访问队列服务

通过easmd stream config命令配置需要访问的队列服务,示例如下:

eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==

上述配置完成后,eascmd默认使用的group_iduser_id分别是default_groupdefault_user,关于groupuser的概念,请您参考文档队列服务订阅推送。如果您需要使用其他的group_id或者user_id,您可以通过--group参数和--user参数进行指定。stream config中的所有参数,都可以在其他读写命令的执行中进行覆盖。

查询队列详情

使用info命令查看队列信息。示例如下:

eascmd stream info

示例结果如下:

[OK] Attributes: 
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0

返回中的参数的具体说明请您参见通过API访问队列服务。info命令不仅可以让您观察队列的属性,也可以让您测试与队列服务的连通性。

向队列中发送数据

使用put命令向队列中发送数据,示例如下:

eascmd stream put -d "10s"

示例结果如下:

[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0

您也可以通过-f参数选择将文件中的数据全部发送到队列,如下所示:

eascmdm stream put -f test.data

示例结果如下:

[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....

此时您可以通过info命令观察队列状态。

查询队列中的数据

使用get命令从队列中查询数据,如下所示:

eascmd stream get -l10 --timeout=3s

示例结果如下:

[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...

以收到的第一条数据介绍整体的输出格式:

  • 第一列 [0 - 1] 表示收到的0号数据index1。

  • 第二列tags[Header:Content-Type=text/plain; charset= ...]表示该数据带有的标签(tag)。其中:

    • Header开头的是您输入数据时使用的HTTP请求头。

    • requestId为内建的自动生成的请求ID。

    • ts@source表示输入队列在收到您请求时的unix时间戳,与之对应的还有ts@sink为输出队列在收到数据时的时间戳。

  • 最后一列为您输入的数据。

重要

如果您有与之搭配的推理服务实例,在向队列输入数据后可能会被推理服务实例消费掉。此时,您需要在输出队列中查询数据,具体做法是在命令中增加-k参数。

您也可以通过--tags参数增加查询条件,比如当需要通过requestId来进行查询时,可以使用以下命令:

eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f

示例结果如下:

[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK

删除队列中的数据

通过deletetrunc命令进行单条数据删除和批量数据删除,参考下述命令:

单条删除:

 eascmd stream delete 3

在确认之后,示例结果如下:

Deleting index(es):
3 [y/N]y
[OK] deleted

批量删除:

eascmd stream trunc 4

在确认之后,示例结果如下:

trunc stream from index: 4 [y/N]y
[OK] truncated

订阅队列

通过watch命令订阅队列服务,参考下述命令:

 eascmd stream watch

示例结果如下:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243   66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]

当您输入Y之后就可以将该数据进行commit,会得到新的数据:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]

当您输入n之后可以确认是否进行negative commit:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y

关于commitnegative commit的说明,请参见CommitNegative

如果您使用--auto-commit选项,将在server端自动commit数据:

 eascmd stream  watch --auto-commit

示例结果如下:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542   66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....

其它选项与命令

上文介绍了eascmd stream的主要命令与选项,eascmd stream还有其它扩展功能,您可以通过eascmd stream help命令获取更多详细帮助。