Python SDK使用说明

推荐使用EAS提供的官方SDK进行服务调用,从而有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Python SDK接口详情,并以常见类型的输入输出为例,提供了使用Python SDK进行服务调用的完整程序示例。

安装方法

pip install -U eas-prediction --user

接口列表

接口

描述

PredictClient

PredictClient(endpoint, service_name, custom_url)

  • 功能:PredictClient类的构造方法。

  • 参数:

    • endpoint:服务端的Endpoint地址。

      如果是普通服务,则设置为默认网关Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

      如果是VPC直连请求,则设置为当前地域的通用Endpoint。例如,华东2(上海)设置为pai-eas-vpc.cn-shanghai.aliyuncs.com

    • service_name:服务名称。

    • custom_url:服务URL。非必需,仅对于Endpoint是非<uid>.<region>.pai-eas.aliyuncs.com格式的服务(例如WebUI服务),可以通过设置该参数来创建客户端,例如client = PredictClient(custom_url='<url>')

set_endpoint(endpoint)

  • 功能:设置服务的Endpoint地址。

  • 参数:endpoint表示服务端的Endpoint地址。

    如果是普通服务,则设置为默认网关Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

    如果是VPC直连请求,则设置为当前地域的通用Endpoint。例如,华东2(上海)设置为pai-eas-vpc.cn-shanghai.aliyuncs.com

set_service_name(service_name)

  • 功能:设置请求的服务名字。

  • 参数:service_name请求的服务名字。

set_endpoint_type(endpoint_type)

  • 功能:设置服务端的网关类型。

  • 参数:endpoint_type待设置的网关类型,支持以下网关类型:

    • ENDPOINT_TYPE_GATEWAY:默认网关。

    • ENDPOINT_TYPE_DIRECT:表示直连请求。如果没有手动设置该参数,则默认通过网关访问服务。

set_token(token)

  • 功能:设置服务访问的Token。

  • 参数:token表示服务访问的Token。

set_retry_count(max_retry_count)

  • 功能:设置请求失败的重试次数。

  • 参数:max_retry_count表示请求失败的重试次数,默认为5。

    重要

    对于服务端进程异常、服务器异常或网关长连接断开等情况导致的个别请求失败,均需要客户端重新发送请求。因此,请勿将该参数设置为0。

set_max_connection_count(max_connection_count)

  • 功能:设置客户端连接池中长连接数量的最大值。出于性能考虑,客户端会与服务端建立长连接,并将连接放入连接池中。每次请求时,从连接池中获取一个空闲连接访问服务。

  • 参数:max_connection_count表示连接池中最大的长连接数量,默认值为100。

set_timeout(timeout)

  • 功能:设置请求的超时时间。

  • 参数:timeout表示请求的超时时间。单位为毫秒,默认值为5000。

init()

PredictClient对象进行初始化。在上述设置参数的接口执行完成后,需要调用init()接口才能生效。

predict(request)

  • 功能:向在线预测服务提交一个预测请求。

  • 参数:request是一个抽象类,可以输入不同类型的request,例如StringRequestTFRequest。

  • 返回值:返回请求对应的Response。

StringRequest

StringRequest(request_data)

  • 功能:StringRequest类的构造方法。

  • 参数:request_data表示待发送的请求字符串。

StringResponse

to_string()

  • 功能:将StringResponse类转换为字符串。

  • 返回值:返回请求的Response Body。

TFRequest

TFRequest(signature_name)

  • 功能:TFRequest类构造方法。

  • 参数:signature_name表示待请求模型中的Signature Name。

add_feed(self, input_name, shape, data_type, content)

  • 功能:请求TensorFlow在线预测服务模型时,设置需要输入的input数据。

  • 参数:

    • input_name:输入Tensor的别名。

    • shape:输入TensorTensorShape。

    • data_type:输入TensorDataType,支持以下类型:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content:输入Tensor的内容,通过一维数组展开表示。

add_fetch(self, output_name)

  • 功能: 请求TensorFlow在线预测服务模型时,设置需要输出的Tensor别名。

  • 参数:output_name表示待输出Tensor的别名。

    对于SavedModel模型,该参数是可选的。如果没有设置该参数,则输出所有的outputs

    对于Frozen Model,该参数必选。

to_string()

  • 功能:将TFRequest构建的用于请求传输的ProtoBuf对象序列化成字符串。

  • 返回值:TFRequest序列化后的字符串。

TFResponse

get_tensor_shape(output_name)

  • 功能:获得指定别名的输出TensorTensorShape。

  • 参数:output_name表示待获取ShapeTensor别名。

  • 返回值:输出的TensorShape。

get_values(output_name)

  • 功能:获取输出Tensor的数据向量。

  • 参数:output_name表示待获取结果数据的Tensor别名。

  • 返回值:输出结果以一维数组的形式保存。您可以搭配get_tensor_shape()接口,获取对应TensorShape,将其还原成所需的多维Tensor。接口会根据output的类型,返回不同类型的结果数组。

TorchRequest

TorchRequest()

TorchRequest类的构造方法。

add_feed(self, index, shape, data_type, content)

  • 功能:请求PyTorch在线预测服务模型时,设置需要输入的Tensor。

  • 参数:

    • index:待输入Tensor的下标。

    • shape:输入TensorTensorShape。

    • data_type表示输入TensorDataType,支持以下类型:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content:输入Tensor的内容,通过一维数组展开表示。

add_fetch(self, output_index)

  • 功能:请求PyTorch在线预测服务模型时,设置需要输出TensorIndex。该接口为可选,如果您没有调用该接口设置输出TensorIndex,则输出所有的outputs

  • 参数:output_index表示输出TensorIndex。

to_string()

  • 功能:将TorchRequest构建的用于请求传输的ProtoBuf对象序列化成字符串。

  • 返回值:TorchRequest序列化后的字符串。

TorchResponse

get_tensor_shape(output_index)

  • 功能:获得指定下标的输出TensorTensorShape。

  • 参数:待获取Shape的输出TensorIndex。

  • 返回值:下标Index对应的输出TensorShape。

get_values(output_index)

  • 功能:获取输出Tensor的数据向量,输出结果以一维数组的形式保存。您可以搭配使用get_tensor_shape()接口,获取对应TensorShape,将其还原成所需的多维Tensor。接口会根据output的类型,返回不同类型的结果数组。

  • 参数:output_index表示待获取的输出 Tensor对应的下标。

  • 返回值:返回的结果Tensor的数据数组。

QueueClient

QueueClient(endpoint, queue_name)

  • 功能:创建一个QueueClient对象。

  • 参数:

    • endpoint:表示服务端的Endpoint地址。

    • queueName:表示队列服务名称。

  • 返回值:创建的QueueClient对象。

set_token(token)

  • 功能:为QueueClient对象设置的用于访问队列服务鉴权的Token。

  • 参数:token表示队列服务的Token。

init(uid=None,gid='eas')

  • 功能:初始化一个QueueClient对象。

  • 参数:

    • uid:表示向服务端注册的客户端的User ID,每个客户端实例的uid不能重复,同一个uid只能允许注册一次,服务端推送数据时会在不同的uid之间均匀地分发。

    • gid:表示向服务端注册的客户端的group id,默认都属于同一个group中,若存在不同的group,同一条数据会向所有的group中均推送一份。

set_logger(logger=None)

  • 功能:为QueueClient设置一个logger对象,默认会将运行中的Warning信息打印至标准输出中,若要关闭该信息可将logger对象设置为None。

  • 参数:logger:表示要设置的logging对象。

truncate(index)

  • 功能:从指定index向前截断队列中的数据,只保留指定index之后的数据。

  • 参数:index:表示要截断的队列中数据的index。

put(data,tags:dict={})

  • 功能:向队列中写入一条数据。

  • 参数:

    • data:表示要向队列中写入的数据内容。

    • tags(可选):表示要向队列中写入的数据的tags。

  • 返回值:

    • index:表示当前写入的数据在队列中的index值,可用于从队列中查询数据。

    • requestId:表示当前写入数据在队列中自动生成的requestId,requestId是一个特殊的tag,也可用于在队列中查询数据。

get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})

  • 功能:根据指定条件从队列中查询数据。

  • 参数:

    • request_id:表示要查询的数据的request id。如果指定该参数,则数据查询时从index开始查询length个数据,如果查询到存在指定request id的数据则返回,否则返回空。

    • index:表示要查询的数据的起始index。默认为0,表示从队列中的第一条数据开始查询。

    • length:表示要查询的数据的条数,返回从index开始计算(包含index)的最大length条数据。

    • timeout:表示查询的等待时间。在等待时间内,如果队列中有length条数据则直接返回,否则等到最大timeout等待时间则停止。

    • auto_delete:表示是否自动从队列中删除已经查询的数据。如果配置为False,则数据可被重复查询,您可以通过调用Del()方法手动删除数据。

    • tags:表示查询包含指定tags的数据,类型为DICT。从指定index开始遍历length条数据,返回包含指定tags的数据。

  • 返回值:表示队列中查询出的以DataFrame封装的数据结果。

attributes()

  • 功能:获取队列的属性信息,包含队列总长度、当前的数据长度等信息。

  • 返回值:attrs:队列的属性信息,类型为DICT。

delete(indexes)

  • 功能:从队列中删除指定index的数据。

  • 参数:indexes:表示要从队列中删除的数据的index值列表,支持单个String类型的index,也支持List类型的多个index列表。

search(index)

  • 功能:查询数据的排队信息。

  • 参数:index表示查询数据的index。

  • 返回值:为JSONObject类型的数据排队信息,包含如下字段:

    • ConsumerId:表示处理该数据的实例ID。

    • IsPending:表示数据是否正在被处理。

      • True表示正在被处理。

      • False表示正在排队。

    • WaitCount:表示前面还需排队等待的数据个数,仅IsPendingFalse时该值才有效,IsPendingTrue时该值为0。

    返回内容示例:

    • 返回{'ConsumerId': 'eas.****', 'IsPending': False, 'WaitCount':2},表示请求正在排队。

    • 回显日志search error:Code 404, Message: b'no data in stream',返回{}。表示未在队列中找到该数据,该情况可能是因为数据已被服务端成功处理并返回结果,或是index参数配置有误,请检查确认。

watch(index, window, index_only=False, auto_commit=False)

  • 功能:订阅队列中的数据,队列服务会根据条件向客户端推送数据。

  • 参数:

    • index:表示订阅的起始数据index。

    • window:表示订阅的窗口大小,队列服务一次最多向单个客户端实例推送的数据量。

      说明

      如果推送的数据没有被commit,则服务端不会再推送新数据;如果commit N条数据,则服务队列会向客户端推送N条数据,确保客户端在同一时刻处理的数据不会超过设置的窗口大小,来实现客户端限制并发的功能。

    • index_only:表示是否只推送index值。

    • auto_commit:表示是否在推送完一条数据后,自动commit数据。建议配置为False。在收到推送数据并计算完成后手动Commit,在未完成计算的情况下实例发生异常,则实例上未commit的数据会由队列服务分发给其他实例继续处理。

  • 返回值:返回一个watcher对象,可通过该对象读取推送的数据。

commit(index)

  • 功能:commit指定index的数据。

    说明

    commit表示服务队列推送的数据已经处理完成,可以将该数据从队列中清除,且不需要再推送给其他实例。

  • 参数:index: 表示要向队列中commit的数据的index值列表,支持单个String类型的index,也支持List类型的多个index的列表。

Watcher

run()

  • 功能:运行一个Watcher,与服务端建立WebSocket连接接收数据推送,并将结果实时返回给调用端。

  • 返回值:表示从队列服务中实时推送到客户端的DataFrame对象。

close()

功能:关闭一个Watcher对象,用于关闭后端的数据连接。

说明

一个客户端只能启动一个Watcher对象,使用完成后需要将该对象关闭才能启动新的Watcher对象。

程序示例

  • 字符串输入输出示例

    对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • TensorFlow输入输出示例

    使用TensorFlow的用户,需要将TFRequestTFResponse分别作为输入和输出数据格式,具体Demo示例如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • 通过VPC网络直连方式调用服务的示例

    通过网络直连方式,您只能访问部署在EAS专属资源组的服务,且需要为该资源组与用户指定的vSwitch连通网络后才能使用。关于如何购买EAS专属资源组和连通网络,请参见使用专属资源组配置网络连通。该调用方式与普通调用方式相比,仅需增加一行代码client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)即可,特别适合大流量高并发的服务,具体示例如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • PyTorch输入输出示例

    使用PyTorch的用户,需要将TorchRequestTorchResponse分别作为输入和输出数据格式,具体Demo示例如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • BladeProcessor输入输出示例

    使用BladeProcessor的用户,需要将BladeRequestBladeResponse分别作为输入和输出数据格式,具体Demo示例如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 兼容EAS默认TensorFlow接口的BladeProcessor输入输出示例

    BladeProcessor用户可以使用兼容EAS默认TensorFlow接口的TFRequestTFResponse作为数据的输入输出格式,具体Demo示例如下。

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 队列服务发送、订阅数据示例

    通过QueueClient可向队列服务中发送数据、查询数据、查询队列服务的状态以及订阅队列服务中的数据推送。以下方的Demo为例,介绍一个线程向队列服务中推送数据,另外一个线程通过Watcher订阅队列服务中推送过来的数据。

    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()