Java SDK 参考

更新时间:
复制为 MD 格式

通过Java SDK使用召回引擎(RecallEngine)

本文介绍如何使用Java SDK调用召回引擎(RecallEngine)进行数据召回和数据写入操作。

前提条件

  • 已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。

  • 已在召回引擎中创建好目标表和召回服务(Service)。

安装Java SDK

在 Maven 项目的 pom.xml 中添加以下依赖:

<dependency>
  <groupId>com.aliyun.openservices.aiservice</groupId>
  <artifactId>pairec-sdk</artifactId>
  <version>1.0.9</version>
</dependency>

初始化RecallEngine客户端

接口

RecallEngineClient client = new RecallEngineClient(endpoint, username, password);

参数说明

参数

示例

说明

endpoint

http://ep-xxxxxx.aliyuncs.com

召回引擎服务的访问地址。

username

System.getenv("RECALL_ENGINE_SERVICE_USERNAME")

通过环境变量获取认证用户名。

password

System.getenv("RECALL_ENGINE_SERVICE_PASSWORD")

通过环境变量获取认证密码。

召回引擎服务的访问地址可以在PAI-Rec管控台 召回管理基本信息页面查看。

在该页面的VPC网络高速连通配置区域,服务域名列即为召回引擎服务的访问地址(endpoint)。

内网调用-可选配置方法

方法

说明

withRetryTimes(int retryTimes)

设置请求失败时的重试次数,默认为0(不重试)。

withHttpClient(OkHttpClient httpClient)

设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。

说明

  • 客户端默认连接超时为200ms,读写超时为500ms,连接池最大连接数为1000。

  • 所有可选配置方法支持链式调用。

示例

import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通过环境变量获取认证信息
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 创建客户端
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 设置重试次数
client.withRetryTimes(2);

公网调用-可选配置方法

方法

说明

withRetryTimes(int retryTimes)

设置请求失败时的重试次数,默认为0(不重试)。

withHttpClient(OkHttpClient httpClient)

设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。

withRegion(String region)

设置实例的 region (必填)

withInstanceId(String instanceId)

设置实例ID (必填)

withAccessKeyId(String accessKeyId)

设置AK ID (必填)

withAccessKeySecret(String accessKeySecret)

设置AK Secret (必填)

withPublicEndpoint(boolean usePublicEndpoint)

设置使用公网 填 true (必填,上面4项必填项都设置,这项才能生效)

说明

  • 需要先设置ak

  • 当使用公网地址时,clientendpoint可以传空值

  • withPublicEndpoint(boolean usePublicEndpoint) 放在最后一个设置

示例

import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通过环境变量获取认证信息
String akId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String akSecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
String region = System.getenv("PAIRC_REGION");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 创建客户端
RecallEngineClient client = new RecallEngineClient("", username, password);
// 设置公网调用参数
client.withAccessKeyId(akId)
  .withAccessKeySecret(akSecret)
  .withRegion(region)
  .withInstanceId(instanceId)
  .withPublicEndpoint(true)
  .withRetryTimes(2);

召回数据

接口

public RecallResponse recall(RecallRequest request) throws RecallEngineException;

RecallRequest参数说明

参数

JSON属性名

类型

说明

instanceId

instance_id

String

召回引擎实例 ID。

service

service

String

召回服务名称。

version

version

String

召回服务版本。

uid

uid

String

用户 ID。

recalls

recalls

Map<String, RecallConf>

召回配置集合。key 为召回名称,value 为召回配置。

requestId

request_id

String

请求 ID(可选)。

exposureList

exposure_list

String

曝光列表(可选),用于过滤已曝光的内容。

contextParams

context_params

Map<String, Object>

上下文参数(可选),传递自定义的上下文信息。

debug

debug

boolean

是否开启调试模式(可选),默认为 false。

retainFields

retain_fields

List<String>

保留召回的字段(可选)

RecallConf参数说明

参数

类型

说明

trigger

String

触发项,即召回的 trigger 值。

count

int

期望返回的召回结果数量。

RecallResponse说明

方法

返回类型

说明

getResult()

Record

获取召回结果记录集。

示例

import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 构造召回请求
RecallRequest request = new RecallRequest();
request.setInstanceId("your-instance-id");
request.setService("recall_test");
request.setVersion("V1");
request.setUid("123");
request.setRetainFields(Arrays.asList("click"))
// 设置召回配置
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
request.setRecalls(recalls);
// 执行召回
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 获取结果信息
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());

写入数据

接口

public WriteResponse write(String instanceId, String table, WriteRequest request) throws RecallEngineException;

参数说明

参数

类型

说明

instanceId

String

召回引擎实例 ID。

table

String

目标表名称。

request

WriteRequest

写入请求对象。

WriteRequest参数说明

参数

JSON属性名

类型

说明

requestId

request_id

String

请求 ID(可选)。

content

content

List<Map<String, Object>>

写入的数据内容。每个 Map 代表一行数据,key 为字段名,value 为字段值。

versionId

versionId

String

版本 ID(可选)。

insertMode

insert_mode

String

写入类型,只能填“insert”或者“upsert”,“insert”是全量更新,“upsert”是部分字段更新

WriteResponse说明

WriteResponse 继承自 Response,包含以下字段:

方法

返回类型

说明

getRequestId()

String

请求 ID。

getCode()

String

状态码,成功时返回 "OK"。

getMessage()

String

响应消息。

getData()

Map<String, Object>

响应数据。

示例

import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
//1. 插入包含所有字段的初始记录
WriteRequest insertRequest = new WriteRequest();
insertRequest.setRequestId("partial-update-init");
insertRequest.setInsertMode(InsertMode.INSERT);
Map<String, Object> initialData = new HashMap<>();
initialData.put("item_id", "item_partial_test_002");
initialData.put("bool_field", true);
initialData.put("category", "electronics");
initialData.put("list", java.util.Arrays.asList("tag1", "tag2", "tag3"));
java.util.List<Map<String, Object>> content = new java.util.ArrayList<>();
content.add(initialData);
insertRequest.setContent(content);
WriteResponse initResp = client.write(instanceId, tableName, insertRequest);
assertEquals("OK", initResp.getCode());
// 刷新以确保数据已写入
client.writeFlush();
System.out.println("Initial insert completed");
//2. 部分更新 — 仅更新 category(分类)字段
WriteRequest updateRequest = new WriteRequest();
updateRequest.setRequestId("partial-update-category");
updateRequest.setInsertMode(InsertMode.UPSERT);  // 支持部分字段更新的 UPSERT 操作
Map<String, Object> partialData = new HashMap<>();
// 必须包含主键以标识记录
partialData.put("item_id", "item_partial_test_002");
// 仅更新 category 字段,其他字段(bool_field、list)保持不变
partialData.put("category", "home_appliances");
java.util.List<Map<String, Object>> updateContent = new java.util.ArrayList<>();
updateContent.add(partialData);
updateRequest.setContent(updateContent);
WriteResponse updateResp = client.write(instanceId, tableName, updateRequest);
assertEquals("OK", updateResp.getCode());
// 刷新以确保数据已写入
client.writeFlush();
System.out.println("Partial update completed: category updated to 'home_appliances'");

删除数据

接口

public  DeleteResponse delete(String instanceId, String table, DeleteRequest request) throws RecallEngineException

参数说明

参数

类型

说明

instanceId

String

召回引擎实例 ID。

table

String

目标表名称。

request

DeleteRequest

删除请求对象。

DeleteRequest参数说明

参数

JSON属性名

类型

说明

requestId

request_id

String

请求 ID(可选)。

keys

keys

List<String>

删除的数据内容主键。

versionId

version_id

String

版本 ID(可选)。

schema

schema

String

数据表的模式/命名空间,如果不设置默认为“default”

DeleteResponse

DeleteResponse 继承自 Response,包含以下字段:

方法

返回类型

说明

getRequestId()

String

请求 ID。

getCode()

String

状态码,成功时返回 "OK"。

getMessage()

String

响应消息。

getData()

Map<String, Object>

响应数据。

示例

import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
String instanceId = "learn-pairec-cn-3ic4mhhvr01";
String tableName = "test_exposure1";
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.setRequestId("delete-version-test-001");
deleteRequest.setKeys(Arrays.asList("12345", "67890"));
deleteRequest.setSchema("default");
deleteRequest.setVersionId("20260327102603");
DeleteResponse deleteResp = client.delete(instanceId, tableName, deleteRequest);
System.out.println("Delete with version response: " + deleteResp.getCode() + ", " + deleteResp.getMessage());
    

操作召回结果(Record)

RecallResponse.getResult() 返回的 Record 对象提供了一系列链式操作方法,用于对召回结果进行排序、过滤、截断等处理。

接口

public Record sort(String name, boolean desc);
public Record retain(int count);
public Record filter(String name);
public Record filterByColumnValue(String name, Predicate<Object> predicate);
public Record filterByValues(Predicate<Map<String, Object>> predicate);
public Record random();
public List<Object> columnValues(String columnName);
public List<String> columnValuesString(String columnName);
public List<String> fieldNames();
public int size();

方法说明

方法

说明

sort(name, desc)

按指定列排序。desc 为 true 时降序,false 时升序。

retain(count)

保留前 count 条记录。

filter(name)

按指定列进行去重。

filterByColumnValue(name, predicate)

按指定列的值进行条件过滤,保留满足 predicate 的记录。

filterByValues(predicate)

按全行数据进行条件过滤,predicate 接收每行的 Map<String, Object>。

random()

将记录随机洗牌。

columnValues(columnName)

获取指定列的所有值。

columnValuesString(columnName)

获取指定列的所有非空字符串值。

fieldNames()

获取所有字段名。

size()

获取当前记录数量。

说明

  • sortretainfilterfilterByColumnValuefilterByValuesrandom 方法均返回 this,支持链式调用。

  • Record 不是线程安全的,请勿在多线程环境下并发操作同一个 Record 实例。

示例

RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 按 score 列降序排序,去重后取前10条
Record processed = result
        .sort("score", true)
        .filter("item_id")
        .retain(10);
System.out.println("Processed records: " + processed.size());
// 获取 item_id 列的所有值
List<String> itemIds = processed.columnValuesString("item_id");
System.out.println("Item IDs: " + itemIds);
// 按条件过滤:仅保留 score > 0.5 的记录
Record filtered = result.filterByColumnValue("score", value -> {
    if (value instanceof Number) {
        return ((Number) value).doubleValue() > 0.5;
    }
    return false;
});
// 按全行条件过滤
Record filtered2 = result.filterByValues(row -> {
    Object category = row.get("category");
    return category != null && "video".equals(category.toString());
});

完整示例

以下是一个完整的召回和写入操作示例:

import com.aliyun.openservices.pairec.recallengine.*;
import java.util.*;
public class RecallEngineDemo {
    public static void main(String[ ] args) throws RecallEngineException {
        // 1. 初始化客户端
        String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
        String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
        String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
        RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
        client.withRetryTimes(2)
        String instanceId = System.getenv("INSTANCE_ID");
        // 2. 写入数据
        WriteRequest writeRequest = new WriteRequest();
        writeRequest.setRequestId("write-req-001");
        List<Map<String, Object>> content = new ArrayList<>();
        Map<String, Object> item = new HashMap<>();
        item.put("user_id", "123");
        item.put("item_id", "item_456");
        item.put("score", 0.95);
        content.add(item);
        writeRequest.setContent(content);
        WriteResponse writeResp = client.write(instanceId, "u2i_table", writeRequest);
        System.out.println("Write result: " + writeResp.getCode());
        // 3. 召回数据
        RecallRequest recallRequest = new RecallRequest();
        recallRequest.setInstanceId(instanceId);
        recallRequest.setService("recall_test");
        recallRequest.setVersion("V1");
        recallRequest.setUid("123");
        Map<String, RecallConf> recalls = new HashMap<>();
        recalls.put("u2i_recall", new RecallConf("123", 100));
        recallRequest.setRecalls(recalls);
        RecallResponse recallResp = client.recall(recallRequest);
        Record result = recallResp.getResult();
        // 4. 处理召回结果
        System.out.println("Total records: " + result.size());
        System.out.println("Field names: " + result.fieldNames());
        // 按 score 降序排序,取前10条
        Record top10 = result.sort("score", true).retain(10);
        System.out.println("Top 10 records: " + top10.toString());
        // 获取 item_id 列表
        List<String> itemIds = top10.columnValuesString("item_id");
        System.out.println("Top 10 item IDs: " + itemIds);
        // 5. 删除数据
        DeleteRequest deleteRequest = new DeleteRequest();
        deleteRequest.setRequestId("delete-version-test-001");
        deleteRequest.setKeys(Arrays.asList("123"));
        deleteRequest.setSchema("default");
        DeleteResponse deleteResp = client.delete(instanceId, tableName, deleteRequest);
        System.out.println("Delete result: " + deleteResp.getCode());
    }
}

异常处理

所有召回和写入操作都可能抛出 RecallEngineException,建议在调用时进行异常捕获处理:

try {
    RecallResponse resp = client.recall(request);
    // 处理结果
} catch (RecallEngineException e) {
    System.err.println("Recall failed: " + e.getMessage());
}