推荐使用PAI-EAS提供的官方SDK进行服务调用,从而有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Python SDK接口详情,并以常见类型的输入输出为例,提供了使用Python SDK进行服务调用的完整程序示例。

安装方法

pip install -U eas-prediction --user

接口列表

接口 描述
PredictClient PredictClient(endpoint, service_name)
  • 功能:PredictClient类的构造方法。
  • 参数:
    • endpoint:服务端的Endpoint地址。

      如果是普通服务,则设置为默认网关Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

      如果是VPC直连请求,则设置为当前地域的通用Endpoint。例如,华东2(上海)设置为pai-eas-vpc.cn-shanghai.aliyuncs.com

    • service_name:服务名字。
set_endpoint(endpoint)
  • 功能:设置服务的Endpoint地址。
  • 参数:endpoint表示服务端的Endpoint地址。

    如果是普通服务,则设置为默认网关Endpoint。例如182848887922***.cn-shanghai.pai-eas.aliyuncs.com

    如果是VPC直连请求,则设置为当前地域的通用Endpoint。例如,华东2(上海)设置为pai-eas-vpc.cn-shanghai.aliyuncs.com

set_service_name(service_name)
  • 功能:设置请求的服务名字。
  • 参数:service_name请求的服务名字。
set_endpoint_type(endpoint_type)
  • 功能:设置服务端的网关类型。
  • 参数:endpoint_type待设置的网关类型,支持以下网关类型:
    • ENDPOINT_TYPE_GATEWAY:默认网关。
    • ENDPOINT_TYPE_DIRECT:表示直连请求。如果没有手动设置该参数,则默认通过网关访问服务。
set_token(token)
  • 功能:设置服务访问的Token。
  • 参数:token表示服务访问的Token。
set_retry_count(max_retry_count)
  • 功能:设置请求失败的重试次数。
  • 参数:max_retry_count表示请求失败的重试次数,默认为5。
    注意 对于服务端进程异常、服务器异常或网关长连接断开等情况导致的个别请求失败,均需要客户端重新发送请求。因此,请勿将该参数设置为0。
set_max_connection_count(max_connection_count)
  • 功能:设置客户端连接池中长链接数量的最大值。出于性能考虑,客户端会与服务端建立长连接,并将连接放入连接池中。每次请求时,从连接池中获取一个空闲连接访问服务。
  • 参数:max_connection_count表示连接池中最大的长连接数量,默认值为100。
set_timeout(timeout)
  • 功能:设置请求的超时时间。
  • 参数:timeout表示请求的超时时间。单位为ms,默认值为5000。
init() 对PredictClient对象进行初始化。在上述设置参数的接口执行完成后,需要调用init()接口才能生效。
predict(request)
  • 功能:向在线预测服务提交一个预测请求。
  • 参数:request是一个抽象类,可以输入不同类型的request,例如StringRequest或TFRequest。
  • 返回值:返回请求对应的Response。
StringRequest StringRequest(request_data)
  • 功能:StringRequest类的构造方法。
  • 参数:request_data表示待发送的请求字符串。
StringResponse to_string()
  • 功能:将StringResponse类转换为字符串。
  • 返回值:返回请求的Response Body。
TFRequest TFRequest(signature_name)
  • 功能:TFRequest类构造方法。
  • 参数:signature_name表示待请求模型中的Signature Name。
add_feed(self, input_name, shape, data_type, content)
  • 功能:请求TensorFlow在线预测服务模型时,设置需要输入的input数据。
  • 参数:
    • input_name:输入Tensor的别名。
    • shape:输入Tensor的TensorShape。
    • data_type:输入Tensor的DataType,支持以下类型:
      • TFRequest.DT_FLOAT
      • TFRequest.DT_DOUBLE
      • TFRequest.DT_INT8
      • TFRequest.DT_INT16
      • TFRequest.DT_INT32
      • TFRequest.DT_INT64
      • TFRequest.DT_STRING
      • TFRequest.TF_BOOL
    • content:输入Tensor的内容,通过一维数组展开表示。
add_fetch(self, output_name)
  • 功能: 请求TensorFlow在线预测服务模型时,设置需要输出的Tensor别名。
  • 参数:output_name表示待输出Tensor的别名。

    对于SavedModel模型,该参数可选。如果没有设置该参数,则输出所有的outputs

    对于Frozen Model,该参数必选。

to_string()
  • 功能:将TFRequest构建的用于请求传输的ProtoBuf对象序列化成字符串。
  • 返回值:TFRequest序列化后的字符串。
TFResponse get_tensor_shape(output_name)
  • 功能:获得指定别名的输出Tensor的TensorShape。
  • 参数:output_name表示待获取Shape的Tensor别名。
  • 返回值:输出的TensorShape。
get_values(output_name)
  • 功能:获取输出Tensor的数据向量。
  • 参数:output_name表示待获取结果数据的Tensor别名。
  • 返回值:输出结果以一维数组的形式保存。您可以搭配get_tensor_shape()接口,获取对应Tensor的Shape,将其还原成所需的多维Tensor。接口会根据output的类型,返回不同类型的结果数组。
TorchRequest TorchRequest() TorchRequest类的构造方法。
add_feed(self, index, shape, data_type, content)
  • 功能:请求PyTorch在线预测服务模型时,设置需要输入的Tensor。
  • 参数:
    • index:待输入Tensor的下标。
    • shape:输入Tensor的TensorShape。
    • data_type表示输入Tensor的DataType,支持以下类型:
      • TFRequest.DT_FLOAT
      • TFRequest.DT_DOUBLE
      • TFRequest.DT_INT8
      • TFRequest.DT_INT16
      • TFRequest.DT_INT32
      • TFRequest.DT_INT64
      • TFRequest.DT_STRING
      • TFRequest.TF_BOOL
    • content:输入Tensor的内容,通过一维数组展开表示。
add_fetch(self, output_index)
  • 功能:请求PyTorch在线预测服务模型时,设置需要输出Tensor的Index。该接口为可选,如果您没有调用该接口设置输出Tensor的Index,则输出所有的outputs
  • 参数:output_index表示输出Tensor的Index。
to_string()
  • 功能:将TorchRequest构建的用于请求传输的ProtoBuf对象序列化成字符串。
  • 返回值:TorchRequest序列化后的字符串。
TorchResponse get_tensor_shape(output_index)
  • 功能:获得指定下标的输出Tensor的TensorShape。
  • 参数:待获取Shape的输出Tensor的Index。
  • 返回值:下标Index对应的输出Tensor的Shape。
get_values(output_index)
  • 功能:获取输出Tensor的数据向量,输出结果以一维数组的形式保存。您可以搭配使用get_tensor_shape()接口,获取对应Tensor的Shape,将其还原成所需的多维Tensor。接口会根据output的类型,返回不同类型的结果数组。
  • 参数:output_index表示待获取的输出 Tensor对应的下标。
  • 返回值:返回的结果Tensor的数据数组。
QueueClient QueueClient(endpoint, queue_name)
  • 功能:创建一个QueueClient对象。
  • 参数:
    • endpoint:表示服务端的Endpoint地址。
    • queueName:表示队列服务名称。
  • 返回值:创建的QueueClient对象。
set_token(token)
  • 功能:为QueueClient对象设置的用于访问队列服务鉴权的Token。
  • 参数:token表示队列服务的Token。
init(uid=None,gid='eas')
  • 功能:初始化一个QueueClient对象。
  • 参数:
    • uid:表示向服务端注册的客户端的User ID,每个客户端实例的uid不能重复,同一个uid只能允许注册一次,服务端推送数据时会在不同的uid之间均匀的分发。
    • gid:表示向服务端注册的客户端的group id,默认都属于同一个group中,若存在不同的group,同一条数据会向所有的group中均推送一份。
set_logger(logger=None)
  • 功能:为QueueClient设置一个logger对象,默认会将运行中的Warning信息打印至标准输出中,若要关闭该信息可将logger对象设置为None。
  • 参数:logger:表示要设置的logging对象。
truncate(index)
  • 功能:从指定index向前截断队列中的数据,只保留指定index之后的数据。
  • 参数:index:表示要截断的队列中数据的index。
put(data,tags:dict={})
  • 功能:向队列中写入一条数据。
  • 参数:
    • data:表示要向队列中写入的数据内容。
    • tags(可选):表示要向队列中写入的数据的tags。
  • 返回值:
    • index:表示当前写入的数据在队列中的index值,可用于从队列中查询数据。
    • requestId:表示当前写入数据在队列中自动生成的requestId,requestId是一个特殊的tag,也可用于在队列中查询数据。
get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})
  • 功能:根据指定条件从队列中查询数据。
  • 参数:
    • request_id:表示要查询的数据的request id。如果指定该参数,则数据查询时从index开始查询length个数据,如果查询到存在指定request id的数据则返回,否则返回空。
    • index:表示要查询的数据的起始index。默认为0,表示从队列中的第一条数据开始查询。
    • length:表示要查询的数据的条数,返回从index开始计算(包含index)的最大length条数据。
    • timeout:表示查询的等待时间。在等待时间内,如果队列中有length条数据则直接返回,否则等到最大timeout等待时间则停止。
    • auto_delete:表示是否自动从队列中删除已经查询的数据。如果配置为False,则数据可被重复查询,您可以通过调用Del()方法手动删除数据。
    • tags:表示查询包含指定tags的数据,类型为DICT。从指定index开始遍历length条数据,返回包含指定tags的数据。
  • 返回值:表示队列中查询出的以DataFrame封装的数据结果。
attributes()
  • 功能:获取队列的属性信息,包含队列总长度,当前的数据长度等信息。
  • 返回值:attrs:队列的属性信息,类型为DICT。
delete(indexes)
  • 功能:从队列中删除指定index的数据。
  • 参数:indexes:表示要从队列中删除的数据的index值列表,支持单个String类型的index,也支持List类型的多个index列表。
watch(index, window, index_only=False, auto_commit=False)
  • 功能:订阅队列中的数据,队列服务会根据条件向客户端推送数据。
  • 参数:
    • index:表示订阅的起始数据index。
    • window:表示订阅的窗口大小,队列服务一次最多向单个客户端实例推送的数据量。
      说明 如果推送的数据没有被commit,则服务端不会再推送新数据;如果commit N条数据,则服务队列会向客户端推送N条数据,确保客户端在同一时刻处理的数据不会超过设置的窗口大小,来实现客户端限制并发的功能。
    • index_only:表示是否只推送index值。
    • auto_commit:表示是否在推送完一条数据后,自动commit数据。建议配置为False。在收到推送数据并计算完成后手动Commit,在未完成计算的情况下实例发生异常,则实例上未commit的数据会由队列服务分发给其他实例继续处理。
  • 返回值:返回一个watcher对象,可通过该对象读取推送的数据。
commit(index)
  • 功能:commit指定index的数据。
    说明 commit表示服务队列推送的数据已经处理完成,可以将该数据从队列中清除,且不需要再推送给其他实例。
  • 参数:index: 表示要向队列中commit的数据的index值列表,支持单个String类型的index,也支持List类型的多个index的列表。
Watcher run()
  • 功能:运行一个Watcher,与服务端建立websocket连接接收数据推送,并将结果实时返回给调用端。
  • 返回值:表示从队列服务中实时推送到客户端的DataFrame对象。
close() 功能:关闭一个Watcher对象,用于关闭后端的数据连接。
说明 一个客户端只能启动一个Watcher对象,使用完成后需要将该对象关闭才能启动新的Watcher对象。

程序示例

  • 字符串输入输出示例
    对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • TensorFlow输入输出示例
    使用TensorFlow的用户,需要将TFRequest和TFResponse分别作为输入和输出数据格式,具体Demo示例如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • 通过VPC网络直连方式调用服务的示例
    通过网络直连方式,您只能访问部署在PAI-EAS专属资源组的服务,且需要为该资源组与用户指定的vSwitch连通网络后才能使用。关于如何购买PAI-EAS专属资源组和连通网络,请参见专属资源组VPC高速直连。该调用方式与普通调用方式相比,仅需增加一行代码client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)即可,特别适合大流量高并发的服务,具体示例如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • PyTorch输入输出示例
    使用PyTorch的用户,需要将TorchRequest和TorchResponse分别作为输入和输出数据格式,具体Demo示例如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • BladeProcessor输入输出示例
    使用BladeProcessor的用户,需要将BladeRequest和BladeResponse分别作为输入和输出数据格式,具体Demo示例如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 兼容PAI-EAS默认TensorFlow接口的BladeProcessor输入输出示例
    BladeProcessor用户可以使用兼容PAI-EAS默认TensorFlow接口的TFRequest与TFResponse作为数据的输入输出格式,具体Demo示例如下。
    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • 队列服务发送、订阅数据示例
    通过QueueClient可向队列服务中发送数据、查询数据、查询队列服务的状态以及订阅队列服务中的数据推送。以下方的Demo为例,介绍一个线程向队列服务中推送数据,另外一个线程通过Watcher订阅队列服务中推送过来的数据。
    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '1828488879222746.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUxNg=='
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()