Java SDK使用说明

使用EAS提供的官方SDK进行服务调用,可以有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Java SDK接口详情。同时,以字符串输入输出、TensorFlow输入输出、QueueService客户端和请求数据压缩为例,提供了使用Java SDK进行服务调用的完整程序示例。

添加依赖项

使用Java编写客户端代码时,在Maven工程中使用EAS Java SDK,必须在pom.xml文件<dependencies>中添加eas-sdk的依赖,示例如下,最新版本以Maven仓库中显示的为准。

<dependency>
  <groupId>com.aliyun.openservices.eas</groupId>
  <artifactId>eas-sdk</artifactId>
  <version>2.0.20</version>
</dependency>

EAS2.0.5及以上版本增加了QueueService客户端功能,支持多优先级异步队列服务。如果需要使用该功能,为避免依赖版本冲突,您还需自行添加如下两个依赖,并修改这两个依赖至合适版本:

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.1</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.1</version>
</dependency>

接口列表

接口

描述

PredictClient

PredictClient(HttpConfig httpConfig)

  • 功能:PredictClient类构造器。

  • 参数:httpConfig表示HttpConfig类的实例对象。

void setToken(String token)

  • 功能:设置HTTP请求的Token参数。

  • 参数:token表示访问服务时需要使用的鉴权Token。

void setModelName(String modelName)

  • 功能:设置请求的在线预测服务的模型名称。

  • 参数:modelName表示所设置的模型名称。

void setEndpoint(String endpoint)

  • 功能:设置请求服务的HostPort,格式为"host:port"

  • 参数:endpoint表示接收消息的终端地址。

void setDirectEndpoint(String endpoint)

  • 功能:设置VPC高速直连通道访问服务的Endpoint,例如,pai-eas-vpc.cn-shanghai.aliyuncs.com

  • 参数:endpoint表示设置的访问服务地址。

void setRequestPath(String requestPath)

  • 功能:设置服务端代码中定义的请求路径。

  • 参数:requestPath表示服务端请求路径,使用示例:client.setRequestPath("/custom_path")

void setRetryCount(boolean int retryCount)

  • 功能:设置失败重试次数。

  • 参数:retryCount表示失败的重试次数。

void setRetryConditions(EnumSet retryConditions)

  • 功能:设置失败重试条件,可与setRetryCount方法结合使用。默认所有请求错误都会重试,此方法可设置仅对特定的请求错误重试。

  • 参数:retryConditions表示EnumSet类型的一个或多个重试条件,目前支持:

    • RetryCondition.CONNECTION_FAILED:请求连接失败。

    • RetryCondition.CONNECTION_TIMEOUT:请求连接超时。

    • RetryCondition.READ_TIMEOUT:等待请求返回超时。

    • RetryCondition.RESPONSE_5XX:返回状态码为5xx。

    • RetryCondition.RESPONSE_4XX:返回状态码为4xx。

  • 调用示例:

    client.setRetryConditions(
        EnumSet.of(
            RetryCondition.READ_TIMEOUT,    // 在读取超时时重试
            RetryCondition.RESPONSE_5XX     // 在接收到5xx系列错误码时重试
        )
    );

    表示仅在请求超时、请求返回状态码为5xx报错时进行重试。

void setContentType(String contentType)

  • 功能:设置HTTP ClientContent类型,默认为"application/octet-stream"。

  • 参数:contentType表示发送数据流的类型。

void setUrl(String url)

功能:自定义请求 URL。

void setCompressor(Compressor compressor)

  • 功能:设置请求数据的压缩方式。

  • 参数:compressor表示压缩方式,目前支持Compressor.GzipCompressor.Zlib。

  • 示例:详情请参见请求数据压缩示例

void addExtraHeaders(Map<String, String> extraHeaders)

  • 功能:额外添加自定义的HTTP Header。

  • 参数:Map<String, String>类型的HTTP Header。

void createChildClient(String token, String endpoint, String modelName)

  • 功能:创建子Client对象,共用父Client对象的线程池。该接口用于多线程预测。

  • 参数:

    • token:服务的鉴权Token。

    • endpoint:服务的Endpoint。

    • modelName:服务的名称。

void predict(TFRequest runRequest)

  • 功能:向在线预测服务提交一个TensorFlow请求。

  • 参数:runRequest表示TensorFlow请求的实例对象。

void predict(String requestContent)

  • 功能:向在线预测服务提交一个字符串请求。

  • 参数:requestContent表示字符串格式的请求内容。

void predict(byte[] requestContent)

  • 功能:向在线预测服务提交一个Byte数组请求。

  • 参数:requestContent表示Byte类型的请求内容。

HttpConfig

void setIoThreadNum(int ioThreadNum)

  • 功能:设置HTTP请求的IO线程数,默认值为2。

  • 参数:ioThreadNum表示发送HTTP请求的IO线程数。

void setReadTimeout(int readTimeout)

  • 功能:设置发送请求后等待返回的超时时间,默认值为5000,表示5s。

  • 参数:readTimeout表示请求的读取超时时间。

void setConnectTimeout(int connectTimeout)

  • 功能:设置连接超时时间,默认值为5000,表示5s。

  • 参数:connectTimeout表示请求的连接超时时间。

void setMaxConnectionCount(int maxConnectionCount)

  • 功能:设置最大连接数,默认值为1000。

  • 参数:maxConnectionCount表示客户端连接池的最大连接数。

void setMaxConnectionPerRoute(int maxConnectionPerRoute)

  • 功能:设置每个路由的最大默认连接数,默认值为1000。

  • 参数:maxConnectionPerRoute表示每个路由上的默认最大连接数。

void setKeepAlive(boolean keepAlive)

  • 功能:设置HTTP服务的keep-alive

  • 参数:keepAlive表示是否开启连接的keep-alive机制,默认为true

int getErrorCode()

返回最近一次调用的状态码。

string getErrorMessage()

返回最近一次调用的状态信息。

TFRequest

void setSignatureName(String value)

  • 功能:请求的在线服务的模型为TensorFlowSavedModel格式时,设置请求模型的signatureDef的名称。

  • 参数:请求模型的signatureDef的名称。

void addFetch(String value)

  • 功能:请求TensorFlow的在线服务模型时,设置模型输出Tensor的别名。

  • 参数:value表示TensorFlow服务输出Tensor的别名。

void addFeed(String inputName, TFDataType dataType, long[]shape, ?[]content)

  • 功能:请求TensorFlow的在线预测服务模型时,设置需要输入的Tensor。

  • 参数:

    • inputName:表示输入Tensor的别名。

    • dataType:表示输入TensorDataType。

    • shape:表示输入TensorTensorShape。

    • content:表示输入Tensor的内容,采用一维数组表示。

      如果输入TensorDataTypeDT_FLOAT、DT_COMPLEX64、DT_BFLOAT16DT_HALF,则content中的元素类型为FLOAT。其中DataTypeDT_COMPLEX64时,content中相邻两个FLOAT元素依次表示复数的实部和虚部。

      如果输入TensorDataTypeDT_DOUBLEDT_COMPLEX128,则content中的元素类型为DOUBLE。其中DataTypeDT_COMPLEX128时,content中相邻两个DOUBLE元素依次表示复数的实部和虚部。

      如果输入TensorDataTypeDT_INT32、DT_UINT8、DT_INT16、DT_INT8、DT_QINT8、DT_QUINT8、DT_QINT32、DT_QINT16、DT_QUINT16DT_UINT16,content中的元素类型为INT。

      如果输入TensorDataTypeDT_INT64,则content中的元素类型为LONG。

      如果输入TensorDataTypeDT_STRING,则content中的元素类型为STRING。

      如果输入TensorDataTypeDT_BOOL,则content中的元素类型为BOOLEAN。

TFResponse

List<Long> getTensorShape(String outputName)

  • 功能:获得指定别名的输出TensorTensorShape。

  • 参数:outputName表示待获取TensorShape的模型输出的名称。

  • 返回值:表示TensorShape的一维数组。

List<Float> getFloatVals(String outputName)

  • 功能:如果输出TensorDataTypeDT_FLOAT、DT_COMPLEX64、DT_BFLOAT16DT_HALF,则可以调用该接口获取指定输出Tensordata

  • 参数:outputName表示待获取FLOAT类型返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

List<Double> getDoubleVals(String outputName)

  • 功能:如果输出TensorDataTypeDT_DOUBLEDT_COMPLEX128,则调用该接口获取指定输出Tensordata

  • 参数:outputname表示待获取DOUBLE类型返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

List<Integer> getIntVals(String outputName)

  • 功能:如果输出TensorDataTypeDT_INT32、DT_UINT8、DT_INT16、DT_INT8、DT_QINT8、DT_QUINT8、DT_QINT32、DT_QINT16、DT_QUINT16DT_UINT16,则调用该接口获取指定输出Tensordata

  • 参数:outputname表示待获取INT类型返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

List<String> getStringVals(String outputName)

  • 功能:如果输出TensorDataTypeDT_STRING,则调用该接口获取指定输出Tensordata

  • 参数:outputName表示待获取STRING类型返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

List<Long> getInt64Vals(String outputName)

  • 功能:如果输出TensorDataTypeDT_INT64,则调用该接口获取指定输出Tensordata

  • 参数:outputName表示待获取的INT64类型返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

List<Boolean> getBoolVals(String outputName)

  • 功能:如果输出TensorDataTypeDT_BOOL,则调用该接口获取指定输出Tensordata

  • 参数:outputName表示待获取BOOL类型的返回数据的模型输出的名称。

  • 返回值:模型输出的TensorData展开成的一维数组。

QueueClient

QueueClient(String endpoint, String queueName, String token, HttpConfig httpConfig, QueueUser user)

  • 功能:QueueClient类的构造函数。

  • 参数:

    • endpoint:服务端的Endpoint地址。

    • queueName:服务名称。

    • token:服务访问的Token。

    • httpConfig:服务请求的配置。

    • user:配置UserId(默认为随机数)与 GroupName(默认为eas)。

JSONObject attributes()

  • 功能:获取队列服务的详细信息。

  • 返回值:JSONObject类型的队列服务信息,主要包含如下字段:

    • meta.maxPayloadBytes:队列中允许的每个数据项的Size上限。

    • meta.name:队列名。

    • stream.approxMaxLength:队列中能够存储的数据项的数量上限。

    • stream.firstEntry:队列中第一个数据项的Index。

    • stream.lastEntry:队列中最后一个数据项的Index。

    • stream.length:队列中当前存储的数据项的数量。

Pair<Long, String> put(byte[] data, long priority, Map<String, String> tags)

  • 功能:将数据写入队列服务。

  • 参数:

    • data:Byte[]类型数据。

    • priority:表示数据优先级。默认值为0,表示非优先数据。将该参数配置为1时,表示高优先级数据。

    • tags:自定义参数。

  • 返回值:Pair<Long, String> 类型的 <数据index, 请求Id>。表示一个由两个元素组成的有序对,第一个元素是一个Long类型的数据 Index,第二个元素是一个String类型的请求ID。

DataFrame[] get(long index, long length, long timeout, boolean autoDelete, Map<String, String> tags)

  • 功能:获取队列服务中的数据。

  • 参数:

    • index:指定获取数据的起始Index,如果为-1则读取最新的数据。

    • length:获取的数据个数。

    • timeout:超时时间,以秒为单位。

    • autoDelete:获取数据后是否自动从队列中删除。

    • tags:自定义参数。例如指定RequestID。

  • 返回值:DataFrame数据类。

void truncate(Long index)

  • 功能:删除队列服务中所有Index小于指定Index的数据。

String delete(Long index)

  • 功能:删除队列服务中指定Index的数据。

  • 参数:index表示待删除数据的index。

  • 返回值:删除成功则返回OK。

JSONObject search(long index)

  • 功能:查询数据的排队信息。

  • 参数:[index]表示查询数据的index。

  • 返回值:为JSONObject类型的数据排队信息,包含如下字段:

    • ConsumerId:表示处理该数据的实例ID。

    • IsPending:表示数据是否正在被处理。

      • True表示正在被处理。

      • False表示正在排队。

    • WaitCount:表示前面还需排队等待的数据个数,仅当IsPendingFalse时该值才有效,IsPendingTrue时该值为0。

    返回内容示例如下:

    • 返回{'ConsumerId': 'eas.****', 'IsPending': False, 'WaitCount':2},表示请求正在排队。

    • 回显日志no data in stream,返回{}。表示未在队列中找到该数据,该情况可能是因为数据已被服务端成功处理并返回结果,或是index参数配置有误,请检查确认。

WebSocketWatcher watch(long index, long window, boolean indexOnly, boolean autoCommit, Map<String, String> tags)

  • 功能:订阅队列服务。

  • 参数:

    • index:指定获取数据的起始Index。如果为-1,则忽略所有Pending的数据而读取最新数据。

    • window:指定发送窗口的大小,即最大的未commit数据长度。当QS发出了windowDataFrame数据但是客户端并没有commit时,发送会停止。

    • indexOnly:返回的DataFrame中只包含Indextags,而不返回具体数据,从而节约带宽。

    • autoCommit:指定发出数据后,直接进行Commit,从而避免Commit调用。当autoCommit设置为truewindow指定的参数将被忽略。

    • tags:自定义订阅请求参数。

  • 返回值:WebSocketWatcher类型,用于获取订阅数据。使用方法可参考QueueService客户端示例

String commit(Long index) String commit(Long[] index)

  • 功能:确认数据已被消费并在队列服务中删除该数据。

  • 返回值:Commit成功则返回OK。

void end(boolean force)

功能:关闭队列服务。

DataFrame

byte[] getData()

  • 功能:获取数据值。

  • 返回值:Byte[]类型的数据值。

long getIndex()

  • 功能:获取数据Index。

  • 返回值:Long类型的数据Index。

Map<String, String> getTags()

  • 功能:获取数据tags。

  • 返回值:Map<String,String>类型的数据Tags,可用于获取RequestID。例如df.getTags().get("requestId")

程序示例

字符串输入输出示例

对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;

public class TestString {
    public static void main(String[] args) throws Exception {
        // 启动并初始化客户端, client对象需要共享,千万不可每个请求都创建一个client对象。
        PredictClient client = new PredictClient(new HttpConfig());
        client.setToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****");
        // 如果要使用网络直连功能,需使用setDirectEndpoint方法
        // 如 client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // 网络直连需打通在EAS控制台开通,提供用于访问EAS服务的源vswitch,打通后可绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
        // 注:普通网关访问时请使用以用户uid为开头的endpoint,在eas控制台服务的调用信息中可查到。直连访问时请使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名进行访问。
        client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
        client.setModelName("scorecard_pmml_example");

        // 输入字符串定义
        String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
        System.out.println(request);

        // 通过eas返回字符串
        try {
            String response = client.predict(request);
            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 关闭客户端
        client.shutdown();
        return;
    }
}

如上述程序所示,使用Java SDK调用服务的流程如下:

  1. 通过PredictClient接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。

  2. PredictClient对象配置Token、EndpointModelName。

  3. 构造STRING类型的request作为输入,通过client.predict发送HTTP请求,系统返回response

TensorFlow输入输出示例

使用TensorFlow的用户,需要将TFRequestTFResponse分别作为输入和输出数据格式,具体Demo示例如下。

import java.util.List;

import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;

public class TestTF {
    public static TFRequest buildPredictRequest() {
        TFRequest request = new TFRequest();
        request.setSignatureName("predict_images");
        float[] content = new float[784];
        for (int i = 0; i < content.length; i++) {
            content[i] = (float) 0.0;
        }
        request.addFeed("images", TFDataType.DT_FLOAT, new long[]{1, 784}, content);
        request.addFetch("scores");
        return request;
    }

    public static void main(String[] args) throws Exception {
        PredictClient client = new PredictClient(new HttpConfig());

        // 如果要使用网络直连功能,需使用setDirectEndpoint方法。
        // 如 client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
        // 网络直连需打通在EAS控制台开通,提供用于访问EAS服务的源vswitch,打通后可绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
        // 注:普通网关访问时请使用以用户uid为开头的endpoint,在eas控制台服务的调用信息中可查到。直连访问时请使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名进行访问。
        client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
        client.setModelName("mnist_saved_model_example");
        client.setToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****");
        long startTime = System.currentTimeMillis();
        int count = 1000;
        for (int i = 0; i < count; i++) {
            try {
                TFResponse response = client.predict(buildPredictRequest());
                List<Float> result = response.getFloatVals("scores");
                System.out.print("Predict Result: [");
                for (int j = 0; j < result.size(); j++) {
                    System.out.print(result.get(j).floatValue());
                    if (j != result.size() - 1) {
                        System.out.print(", ");
                    }
                }
                System.out.print("]\n");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time: " + (endTime - startTime) + "ms");
        client.shutdown();
    }
}

如上述程序所示,使用Java SDK调用TensorFlow服务的流程如下:

  1. 通过PredictClient接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。

  2. PredictClient对象配置Token、EndpointModelName。

  3. 使用TFRequest类封装输入数据,使用TFResponse类封装输出数据。

QueueService客户端示例

支持通过QueueClient接口使用队列服务功能,具体demo示例如下:

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import com.aliyun.openservices.eas.predict.queue_client.QueueUser;
import com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher;

public class DemoWatch {
    public static void main(String[] args) throws Exception {
        /** 创建队列服务客户端 */
        String queueEndpoint = "18*******.cn-hangzhou.pai-eas.aliyuncs.com";
        String inputQueueName = "test_queue_service";
        String sinkQueueName = "test_queue_service/sink";
        String queueToken = "test-token";

        /** 输入队列,往输入队列添加数据,推理服务会自动从输入队列中读取请求数据 */
        QueueClient inputQueue =
            new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), new QueueUser());
        /** 输出队列,推理服务处理输入数据后会将结果写入输出队列 */
        QueueClient sinkQueue =
            new QueueClient(queueEndpoint, sinkQueueName, queueToken, new HttpConfig(), new QueueUser());
        /** 清除队列数据!!!请谨慎使用 */
        inputQueue.clear();
        sinkQueue.clear();

        /** 往输入队列添加数据 */
        int count = 10;
        for (int i = 0; i < count; ++i) {
            String data = Integer.toString(i);
            inputQueue.put(data.getBytes(), null);
            /** 队列服务支持多优先级队列,可通过put函数设置数据优先级,默认优先级为0 */
            //  inputQueue.put(data.getBytes(), 0L, null);
        }

        /** 通过watch函数订阅输出队列的数据,窗口大小为5 */
        WebSocketWatcher watcher = sinkQueue.watch(0L, 5L, false, true, null);
        /** WatchConfig参数可自定义重试次数、重试间隔(单位为秒)、是否无限重试;未配置WatchConfig则默认重试次数:3,重试间隔:5 */
        //  WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(3, 1));
        //  WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(true, 10));

        /** 获取输出数据 */
        for (int i = 0; i < count; ++i) {
            try {
                /** getDataFrame 函数用于获取DataFrame数据类,没有数据时会被阻塞 */
                byte[] data = watcher.getDataFrame().getData();
                System.out.println("[watch] data = " + new String(data));
            } catch (RuntimeException ex) {
                System.out.println("[watch] error = " + ex.getMessage());
                break;
            }
        }
        /** 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再运行时会报错 */
        watcher.close();

        Thread.sleep(2000);
        JSONObject attrs = sinkQueue.attributes();
        System.out.println(attrs.toString());

        /** 关闭客户端 */
        inputQueue.shutdown();
        sinkQueue.shutdown();
    }
}

如上述程序所示,使用Java SDK调用服务的流程如下:

  1. 通过QueueClient接口创建队列服务客户端对象。如果创建了推理服务,需同时创建输入队列和输出队列对象。

  2. 使用put()函数向输入队列中发送数据;使用watch()函数从输出队列中订阅数据。

    说明

    现实场景中,发送数据和订阅数据可以由不同的线程处理,本示例中为了演示方便,在同一线程中完成,先Put数据,后Watch结果。

请求数据压缩示例

对于请求数据量较大的情况,EAS支持将数据压缩之后再发送至服务端,目前支持ZlibGzip两种压缩格式。该功能需要在服务配置中指定相应的rpc.decompressor才能生效。

服务配置如下所示:

"metadata": {
  "rpc": {
    "decompressor": "zlib"
  }
}

SDK代码调用示例如下:

package com.aliyun.openservices.eas.predict;
import com.aliyun.openservices.eas.predict.http.Compressor;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
    public static void main(String[] args) throws Exception{
    	  // 启动并初始化客户端。
        PredictClient client = new PredictClient(new HttpConfig());
        client.setEndpoint("18*******.cn-hangzhou.pai-eas.aliyuncs.com");
        client.setModelName("echo_compress");
        client.setToken("YzZjZjQwN2E4NGRkMDMxNDk5NzhhZDcwZDBjOTZjOGYwZDYxZGM2****");
        // 或者使用Compressor.Gzip。
        client.setCompressor(Compressor.Zlib);  
        // 输入字符串定义。
        String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
        System.out.println(request);
        // 通过eas返回字符串。
        String response = client.predict(request);
        System.out.println(response);
        // 关闭客户端。
        client.shutdown();
        return;
    }
}

相关文档