推荐使用EAS提供的官方SDK进行服务调用,从而有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Golang SDK接口详情,并以常见类型的输入输出为例,提供了使用Golang SDK进行服务调用的完整程序示例。
背景信息
使用Golang SDK进行服务调用时,由于在编译代码时,Golang的包管理工具会自动从Github上将Golang SDK的代码下载到本地,因此您无需提前安装Golang SDK。如果您需要自定义部分调用逻辑,可以先下载Golang SDK代码,再对其进行修改。
接口列表
类 | 接口 | 描述 |
PredictClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 对PredictClient对象进行初始化。在上述设置参数的接口执行完成后,需要调用 | |
|
| |
|
| |
|
| |
|
| |
TFRequest |
|
|
|
| |
|
| |
TFResponse |
|
|
|
| |
TorchRequest |
| TFRequest类的构建函数。 |
|
| |
|
| |
TorchResponse |
|
|
|
| |
QueueClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
types.Watcher |
|
|
| 功能:关闭一个Watcher对象,用于关闭后端的数连接。 说明 一个客户端只能启动一个Watcher对象,使用完成后需要将该对象关闭才能启动新的Watcher对象。 |
程序示例
字符串输入输出示例
对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。
package main
import (
"fmt"
"github.com/pai-eas/eas-golang-sdk/eas"
)
func main() {
client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "scorecard_pmml_example")
client.SetToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****")
client.Init()
req := "[{\"fea1\": 1, \"fea2\": 2}]"
for i := 0; i < 100; i++ {
resp, err := client.StringPredict(req)
if err != nil {
fmt.Printf("failed to predict: %v\n", err.Error())
} else {
fmt.Printf("%v\n", resp)
}
}
}
TensorFlow输入输出示例
使用TensorFlow的用户,需要将TFRequest和TFResponse分别作为输入和输出数据格式,具体Demo示例如下。
package main
import (
"fmt"
"github.com/pai-eas/eas-golang-sdk/eas"
)
func main() {
client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "mnist_saved_model_example")
client.SetToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****")
client.Init()
tfreq := eas.TFRequest{}
tfreq.SetSignatureName("predict_images")
tfreq.AddFeedFloat32("images", []int64{1, 784}, make([]float32, 784))
for i := 0; i < 100; i++ {
resp, err := client.TFPredict(tfreq)
if err != nil {
fmt.Printf("failed to predict: %v", err)
} else {
fmt.Printf("%v\n", resp)
}
}
}
PyTorch输入输出示例
使用PyTorch的用户,需要将TorchRequest和TorchResponse分别作为输入和输出数据格式,具体Demo示例如下。
package main
import (
"fmt"
"github.com/pai-eas/eas-golang-sdk/eas"
)
func main() {
client := eas.NewPredictClient("182848887922****.cn-shanghai.pai-eas.aliyuncs.com", "pytorch_resnet_example")
client.SetTimeout(500)
client.SetToken("ZjdjZDg1NWVlMWI2NTU5YzJiMmY5ZmE5OTBmYzZkMjI0YjlmYWVl****")
client.Init()
req := eas.TorchRequest{}
req.AddFeedFloat32(0, []int64{1, 3, 224, 224}, make([]float32, 150528))
req.AddFetch(0)
for i := 0; i < 10; i++ {
resp, err := client.TorchPredict(req)
if err != nil {
fmt.Printf("failed to predict: %v", err)
} else {
fmt.Println(resp.GetTensorShape(0), resp.GetFloatVal(0))
}
}
}
通过VPC网络直连方式调用服务的示例
通过网络直连方式,您只能访问部署在EAS专属资源组的服务,且需要为该资源组与用户指定的vSwitch连通网络后才能使用。关于如何购买EAS专属资源组和连通网络,请参见使用专属资源组和配置网络连通。该调用方式与普通调用方式相比,仅需增加一行代码client.SetEndpointType(eas.EndpointTypeDirect)
即可,特别适合大流量高并发的服务,具体示例如下。
package main
import (
"fmt"
"github.com/pai-eas/eas-golang-sdk/eas"
)
func main() {
client := eas.NewPredictClient("pai-eas-vpc.cn-shanghai.aliyuncs.com", "scorecard_pmml_example")
client.SetToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****")
client.SetEndpointType(eas.EndpointTypeDirect)
client.Init()
req := "[{\"fea1\": 1, \"fea2\": 2}]"
for i := 0; i < 100; i++ {
resp, err := client.StringPredict(req)
if err != nil {
fmt.Printf("failed to predict: %v\n", err.Error())
} else {
fmt.Printf("%v\n", resp)
}
}
}
客户端连接参数设置的示例
您可以通过http.Transport
属性设置请求客户端的连接参数,示例代码如下。
package main
import (
"fmt"
"github.com/pai-eas/eas-golang-sdk/eas"
)
func main() {
client := eas.NewPredictClient("pai-eas-vpc.cn-shanghai.aliyuncs.com", "network_test")
client.SetToken("MDAwZDQ3NjE3OThhOTI4ODFmMjJiYzE0MDk1NWRkOGI1MmVhMGI0****")
client.SetEndpointType(eas.EndpointTypeDirect)
client.SetHttpTransport(&http.Transport{
MaxConnsPerHost: 300,
TLSHandshakeTimeout: 100 * time.Millisecond,
ResponseHeaderTimeout: 200 * time.Millisecond,
ExpectContinueTimeout: 200 * time.Millisecond,
})
}
队列服务发送、订阅数据示例
通过QueueClient可向队列服务中发送数据、查询数据、查询队列服务的状态以及订阅队列服务中的数据推送。以下方Demo为例,介绍一个线程向队列服务中推送数据,另一个线程通过Watcher订阅队列服务中推送过来的数据。
在EAS部署异步推理服务,会自动生成输入队列和输出队列,通常地址格式如下:
输入队列:<domain>/api/predict/<service_name>
输出队列:<domain>/api/predict/<service_name>/sink
请根据您实际需要采用<service_name>或者<service_name>/sink构建QueueClient。
const (
QueueEndpoint = "182848887922****.cn-shanghai.pai-eas.aliyuncs.com"
// eg:EAS服务名为test_qservice,则输入队列名为test_qservice,输出队列名为test_qservice/sink
QueueName = "test_qservice"
QueueToken = "YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****"
)
queue, err := NewQueueClient(QueueEndpoint, QueueName, QueueToken)
// truncate all messages in the queue
attrs, err := queue.Attributes()
if index, ok := attrs["stream.lastEntry"]; ok {
idx, _ := strconv.ParseUint(index, 10, 64)
queue.Truncate(context.Background(), idx+1)
}
ctx, cancel := context.WithCancel(context.Background())
// create a goroutine to send messages to the queue
go func() {
i := 0
for {
select {
case <-time.NewTicker(time.Microsecond * 1).C:
_, _, err := queue.Put(context.Background(), []byte(strconv.Itoa(i)), types.Tags{})
if err != nil {
fmt.Printf("Error occured, retry to handle it: %v\n", err)
}
i += 1
case <-ctx.Done():
break
}
}
}()
// create a watcher to watch the messages from the queue
watcher, err := queue.Watch(context.Background(), 0, 5, false, false)
if err != nil {
fmt.Printf("Failed to create a watcher to watch the queue: %v\n", err)
return
}
// read messages from the queue and commit manually
for i := 0; i < 100; i++ {
df := <-watcher.FrameChan()
err := queue.Commit(context.Background(), df.Index.Uint64())
if err != nil {
fmt.Printf("Failed to commit index: %v(%v)\n", df.Index, err)
}
}
// everything is done, close the watcher
watcher.Close()
cancel()