Java SDK 参考
通过Java SDK使用召回引擎(RecallEngine)
本文介绍如何使用Java SDK调用召回引擎(RecallEngine)进行数据召回和数据写入操作。
前提条件
已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。
已在召回引擎中创建好目标表和召回服务(Service)。
安装Java SDK
在 Maven 项目的 pom.xml 中添加以下依赖:
<dependency>
<groupId>com.aliyun.openservices.aiservice</groupId>
<artifactId>pairec-sdk</artifactId>
<version>1.0.7</version>
</dependency>初始化RecallEngine客户端
接口
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);参数说明
参数 | 示例 | 说明 |
endpoint | http://ep-xxxxxx.aliyuncs.com | 召回引擎服务的访问地址。 |
username | System.getenv("RECALL_ENGINE_SERVICE_USERNAME") | 通过环境变量获取认证用户名。 |
password | System.getenv("RECALL_ENGINE_SERVICE_PASSWORD") | 通过环境变量获取认证密码。 |
召回引擎服务的访问地址可以在PAI-Rec管控台 召回管理基本信息页面查看。

可选配置方法
方法 | 说明 |
withRetryTimes(int retryTimes) | 设置请求失败时的重试次数,默认为0(不重试)。 |
withHttpClient(OkHttpClient httpClient) | 设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。 |
说明
客户端默认连接超时为200ms,读写超时为500ms,连接池最大连接数为1000。
所有可选配置方法支持链式调用。
示例
import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通过环境变量获取认证信息
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 创建客户端
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 设置重试次数
client.withRetryTimes(2);召回数据
接口
public RecallResponse recall(RecallRequest request) throws RecallEngineException;RecallRequest参数说明
参数 | JSON属性名 | 类型 | 说明 |
instanceId | instance_id | String | 召回引擎实例 ID。 |
service | service | String | 召回服务名称。 |
version | version | String | 召回服务版本。 |
uid | uid | String | 用户 ID。 |
recalls | recalls | Map<String, RecallConf> | 召回配置集合。key 为召回名称,value 为召回配置。 |
requestId | request_id | String | 请求 ID(可选)。 |
exposureList | exposure_list | String | 曝光列表(可选),用于过滤已曝光的内容。 |
contextParams | context_params | Map<String, Object> | 上下文参数(可选),传递自定义的上下文信息。 |
debug | debug | boolean | 是否开启调试模式(可选),默认为 false。 |
RecallConf参数说明
参数 | 类型 | 说明 |
trigger | String | 触发项,即召回的 trigger 值。 |
count | int | 期望返回的召回结果数量。 |
RecallResponse说明
方法 | 返回类型 | 说明 |
getResult() | Record | 获取召回结果记录集。 |
示例
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.HashMap;
import java.util.Map;
// 构造召回请求
RecallRequest request = new RecallRequest();
request.setInstanceId("your-instance-id");
request.setService("recall_test");
request.setVersion("V1");
request.setUid("123");
// 设置召回配置
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
request.setRecalls(recalls);
// 执行召回
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 获取结果信息
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());写入数据
接口
public WriteResponse write(String instanceId, String table, WriteRequest request) throws RecallEngineException;参数说明
参数 | 类型 | 说明 |
instanceId | String | 召回引擎实例 ID。 |
table | String | 目标表名称。 |
request | WriteRequest | 写入请求对象。 |
WriteRequest参数说明
参数 | JSON属性名 | 类型 | 说明 |
requestId | request_id | String | 请求 ID(可选)。 |
content | content | List<Map<String, Object>> | 写入的数据内容。每个 Map 代表一行数据,key 为字段名,value 为字段值。 |
versionId | versionId | String | 版本 ID(可选)。 |
WriteResponse说明
WriteResponse 继承自 Response,包含以下字段:
方法 | 返回类型 | 说明 |
getRequestId() | String | 请求 ID。 |
getCode() | String | 状态码,成功时返回 "OK"。 |
getMessage() | String | 响应消息。 |
getData() | Map<String, Object> | 响应数据。 |
示例
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// 构造写入请求
WriteRequest request = new WriteRequest();
request.setRequestId("write-req-123");
// 构造写入数据
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_123");
item.put("score", 0.95);
List<Map<String, Object>> content = new ArrayList<>();
content.add(item);
request.setContent(content);
// 执行写入
WriteResponse resp = client.write("your-instance-id", "u2i_table", request);
// 检查写入结果
System.out.println("Code: " + resp.getCode()); // 成功时返回 "OK"
System.out.println("Message: " + resp.getMessage());操作召回结果(Record)
RecallResponse.getResult() 返回的 Record 对象提供了一系列链式操作方法,用于对召回结果进行排序、过滤、截断等处理。
接口
public Record sort(String name, boolean desc);
public Record retain(int count);
public Record filter(String name);
public Record filterByColumnValue(String name, Predicate<Object> predicate);
public Record filterByValues(Predicate<Map<String, Object>> predicate);
public Record random();
public List<Object> columnValues(String columnName);
public List<String> columnValuesString(String columnName);
public List<String> fieldNames();
public int size();方法说明
方法 | 说明 |
sort(name, desc) | 按指定列排序。desc 为 true 时降序,false 时升序。 |
retain(count) | 保留前 count 条记录。 |
filter(name) | 按指定列进行去重。 |
filterByColumnValue(name, predicate) | 按指定列的值进行条件过滤,保留满足 predicate 的记录。 |
filterByValues(predicate) | 按全行数据进行条件过滤,predicate 接收每行的 Map<String, Object>。 |
random() | 将记录随机洗牌。 |
columnValues(columnName) | 获取指定列的所有值。 |
columnValuesString(columnName) | 获取指定列的所有非空字符串值。 |
fieldNames() | 获取所有字段名。 |
size() | 获取当前记录数量。 |
说明
sort、retain、filter、filterByColumnValue、filterByValues、random方法均返回this,支持链式调用。Record不是线程安全的,请勿在多线程环境下并发操作同一个 Record 实例。
示例
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 按 score 列降序排序,去重后取前10条
Record processed = result
.sort("score", true)
.filter("item_id")
.retain(10);
System.out.println("Processed records: " + processed.size());
// 获取 item_id 列的所有值
List<String> itemIds = processed.columnValuesString("item_id");
System.out.println("Item IDs: " + itemIds);
// 按条件过滤:仅保留 score > 0.5 的记录
Record filtered = result.filterByColumnValue("score", value -> {
if (value instanceof Number) {
return ((Number) value).doubleValue() > 0.5;
}
return false;
});
// 按全行条件过滤
Record filtered2 = result.filterByValues(row -> {
Object category = row.get("category");
return category != null && "video".equals(category.toString());
});完整示例
以下是一个完整的召回和写入操作示例:
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.*;
public class RecallEngineDemo {
public static void main(String[ ] args) throws RecallEngineException {
// 1. 初始化客户端
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
client.withRetryTimes(2)
String instanceId = System.getenv("INSTANCE_ID");
// 2. 写入数据
WriteRequest writeRequest = new WriteRequest();
writeRequest.setRequestId("write-req-001");
List<Map<String, Object>> content = new ArrayList<>();
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_456");
item.put("score", 0.95);
content.add(item);
writeRequest.setContent(content);
WriteResponse writeResp = client.write(instanceId, "u2i_table", writeRequest);
System.out.println("Write result: " + writeResp.getCode());
// 3. 召回数据
RecallRequest recallRequest = new RecallRequest();
recallRequest.setInstanceId(instanceId);
recallRequest.setService("recall_test");
recallRequest.setVersion("V1");
recallRequest.setUid("123");
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
recallRequest.setRecalls(recalls);
RecallResponse recallResp = client.recall(recallRequest);
Record result = recallResp.getResult();
// 4. 处理召回结果
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());
// 按 score 降序排序,取前10条
Record top10 = result.sort("score", true).retain(10);
System.out.println("Top 10 records: " + top10.toString());
// 获取 item_id 列表
List<String> itemIds = top10.columnValuesString("item_id");
System.out.println("Top 10 item IDs: " + itemIds);
}
}异常处理
所有召回和写入操作都可能抛出 RecallEngineException,建议在调用时进行异常捕获处理:
try {
RecallResponse resp = client.recall(request);
// 处理结果
} catch (RecallEngineException e) {
System.err.println("Recall failed: " + e.getMessage());
}