Java SDK使用说明
推荐使用PAI-EAS提供的官方SDK进行服务调用,从而有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Java SDK接口详情。同时,以字符串输入输出、TensorFlow输入输出、QueueService客户端和请求数据压缩为例,提供了使用Java SDK进行服务调用的完整程序示例。
添加依赖包
使用Java编写客户端代码时,需要使用Maven管理项目。因此,您必须在pom.xml文件中添加客户端所需的依赖包eas-sdk。依赖包eas-sdk的最新版本为2.0.7,pom.xml文件中的具体代码如下。
<dependency>
<groupId>com.aliyun.openservices.eas</groupId>
<artifactId>eas-sdk</artifactId>
<version>2.0.7</version>
</dependency>
2.0.5及以上版本增加了QueueService客户端功能,支持多优先级异步队列服务。如果需要使用该功能,为避免依赖包版本冲突,您还需自行添加如下两个依赖包,并修改这两个依赖包至合适版本:
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
接口列表
类 | 接口 | 描述 |
PredictClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
HttpConfig |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
| 返回最近一次调用的状态码。 | |
| 返回最近一次调用的状态信息。 | |
TFRequest |
|
|
|
| |
|
| |
TFResponse |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
QueueClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 功能:关闭队列服务。 | |
DataFrame |
|
|
|
| |
|
|
程序示例
字符串输入输出示例
对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class Test_String {
public static void main(String[] args) throws Exception{
// 启动并初始化客户端。client对象需要共享,不能每个请求都创建一个client对象。
PredictClient client = new PredictClient(new HttpConfig());
client.setToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****");
// 如果需要使用网络直连功能,则使用setDirectEndpoint方法。
// 例如,client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 网络直连功能需要在PAI-EAS控制台开通,提供用于访问PAI-EAS服务的源vswitch。开通后可以绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
// 注意:通过普通网关访问时,需要使用以用户uid开头的Endpoint,在PAI-EAS控制台服务的调用信息中可以查到该信息。通过直连访问时,需要使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名。
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("scorecard_pmml_example");
//定义输入字符串。
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
//通过PAI-EAS返回字符串。
try {
String response = client.predict(request);
System.out.println(response);
} catch(Exception e) {
e.printStackTrace();
}
return;
}
}
如上述程序所示,使用Java SDK调用服务的流程如下:
通过
PredictClient
接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。为PredictClient对象配置Token、Endpoint及ModelName。
构造STRING类型的request作为输入,通过
client.predict
发送HTTP请求,系统返回response。
TensorFlow输入输出示例
使用TensorFlow的用户,需要将TFRequest和TFResponse分别作为输入和输出数据格式,具体Demo示例如下。
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class Test_TF {
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.setSignatureName("predict_images");
float[] content = new float[784];
for (int i = 0; i < content.length; i++)
content[i] = (float)0.0;
request.addFeed("images", TFDataType.DT_FLOAT, new long[]{1, 784}, content);
request.addFetch("scores");
return request;
}
public static void main(String[] args) throws Exception{
PredictClient client = new PredictClient(new HttpConfig());
// 如果使用网络直连功能,则调用setDirectEndpoint方法。
// 例如,client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 网络直连功能需要在PAI-EAS控制台开通,提供用于访问PAI-EAS服务的源vswitch。开通后可以绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
// 注意:通过普通网关访问时,需要使用以用户uid开头的Endpoint,在PAI-EAS控制台服务的调用信息中可以查到该信息。通过直连访问时,需要使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名。
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("mnist_saved_model_example");
client.setToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****");
long startTime = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
List<Float> result = response.getFloatVals("scores");
System.out.print("Predict Result: [");
for (int j = 0; j < result.size(); j++) {
System.out.print(result.get(j).floatValue());
if (j != result.size() -1)
System.out.print(", ");
}
System.out.print("]\n");
} catch(Exception e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Spend Time: " + (endTime - startTime) + "ms");
}
}
如上述程序所示,使用Java SDK调用TensorFlow服务的流程如下:
通过
PredictClient
接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。为PredictClient对象配置Token、Endpoint及ModelName。
使用TFRequest类封装输入数据,使用TFResponse类封装输出数据。
QueueService客户端示例
支持通过QueueClient接口使用队列服务功能,具体demo示例如下:
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import com.aliyun.openservices.eas.predict.queue_client.QueueUser;
import com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher;
import org.apache.commons.lang3.tuple.Pair;
import java.util.HashMap;
import java.util.Map;
public class TestWatch {
public static void main(String[] args) throws Exception {
/** 创建队列服务客户端 */
String queueEndpoint = "18*******.cn-hangzhou.pai-eas.aliyuncs.com";
String inputQueueName = "test_group.test_qservice";
String sinkQueueName = "test_group.test_qservice/sink";
String queueToken = "test-token";
/** 输入队列,往输入队列添加数据,推理服务会自动从输入队列中读取请求数据 */
QueueClient input_queue =
new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), new QueueUser());
input_queue.clear();
/** 输出队列,推理服务处理输入数据后会将结果写入输出队列*/
QueueClient sink_queue =
new QueueClient(queueEndpoint, sinkQueueName, queueToken, new HttpConfig(), new QueueUser());
sink_queue.clear();
/** 往输入队列添加数据*/
for (int i = 0; i < 10; ++i) {
String data = Integer.toString(i);
input_queue.put(data.getBytes(), null);
/** 队列服务支持多优先级队列,可通过 put函数设置数据优先级,默认优先级为 0 */
// inputQueue.put(data.getBytes(), 0L, null);
}
/** 通过 watch 函数订阅输出队列的数据,窗口大小为5 */
WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null);
/** 获取输出数据 */
for (int i = 0; i < 10; ++i) {
/** getDataFrame 函数用于获取 DataFrame 数据类,没有数据时会被阻塞 */
byte[] data = watcher.getDataFrame().getData();
System.out.println("[watch] data = " + new String(data));
}
/** 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再运行时会报错 */
watcher.close();
Thread.sleep(2000);
JSONObject attrs = sink_queue.attributes();
/** 关闭客户端 */
input_queue.shutdown();
sink_queue.shutdown();
}
}
如上述程序所示,使用Java SDK调用服务的流程如下:
通过
QueueClient
接口创建队列服务客户端对象。如果创建了推理服务,需同时创建输入队列和输出队列对象。使用
put()
函数向输入队列中发送数据;使用watch()
函数从输出队列中订阅数据。说明现实场景中发送数据和订阅数据可以由不同的线程处理,本示例中为了演示方便在同一线程中完成,先Put数据,后Watch结果。
请求数据压缩示例
对于请求数据量较大的情况,PAI-EAS支持将数据压缩之后再发送至服务端,目前支持Zlib和Gzip两种压缩格式。该功能需要在服务配置中指定相应的rpc.decompressor
才能生效。
服务配置如下所示:
"metadata": {
"rpc": {
"decompressor": "zlib"
}
}
SDK代码调用示例如下:
package com.aliyun.openservices.eas.predict;
import com.aliyun.openservices.eas.predict.http.Compressor;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class Test_String {
public static void main(String[] args) throws Exception{
//启动并初始化客户端。
PredictClient client = new PredictClient(new HttpConfig());
client.setEndpoint("eas-shanghai.alibaba-inc.com");
client.setModelName("echo_compress");
client.setToken("YzZjZjQwN2E4NGRkMDMxNDk5NzhhZDcwZDBjOTZjOGYwZDYxZGM2****");
client.setCompressor(Compressor.Zlib); // 或者 Compressor.Gzip。
//输入字符串定义。
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
//通过eas返回字符串。
String response = client.predict(request);
System.out.println(response);
//关闭客户端。
client.shutdown();
return;
}
}