通过Java SDK使用召回引擎(RecallEngine)
本文介绍如何使用Java SDK调用召回引擎(RecallEngine)进行数据召回和数据写入操作。
前提条件
-
已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。
-
已在召回引擎中创建好目标表和召回服务(Service)。
安装Java SDK
在 Maven 项目的 pom.xml 中添加以下依赖:
<dependency>
<groupId>com.aliyun.openservices.aiservice</groupId>
<artifactId>pairec-sdk</artifactId>
<version>1.0.9</version>
</dependency>
初始化RecallEngine客户端
接口
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
参数说明
|
参数 |
示例 |
说明 |
|
endpoint |
http://ep-xxxxxx.aliyuncs.com |
召回引擎服务的访问地址。 |
|
username |
System.getenv("RECALL_ENGINE_SERVICE_USERNAME") |
通过环境变量获取认证用户名。 |
|
password |
System.getenv("RECALL_ENGINE_SERVICE_PASSWORD") |
通过环境变量获取认证密码。 |
召回引擎服务的访问地址可以在PAI-Rec管控台 召回管理基本信息页面查看。
在该页面的VPC网络高速连通配置区域,服务域名列即为召回引擎服务的访问地址(endpoint)。
内网调用-可选配置方法
|
方法 |
说明 |
|
withRetryTimes(int retryTimes) |
设置请求失败时的重试次数,默认为0(不重试)。 |
|
withHttpClient(OkHttpClient httpClient) |
设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。 |
说明
-
客户端默认连接超时为200ms,读写超时为500ms,连接池最大连接数为1000。
-
所有可选配置方法支持链式调用。
示例
import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通过环境变量获取认证信息
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 创建客户端
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 设置重试次数
client.withRetryTimes(2);
公网调用-可选配置方法
|
方法 |
说明 |
|
withRetryTimes(int retryTimes) |
设置请求失败时的重试次数,默认为0(不重试)。 |
|
withHttpClient(OkHttpClient httpClient) |
设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。 |
|
withRegion(String region) |
设置实例的 region (必填) |
|
withInstanceId(String instanceId) |
设置实例ID (必填) |
|
withAccessKeyId(String accessKeyId) |
设置AK ID (必填) |
|
withAccessKeySecret(String accessKeySecret) |
设置AK Secret (必填) |
|
withPublicEndpoint(boolean usePublicEndpoint) |
设置使用公网 填 true (必填,上面4项必填项都设置,这项才能生效) |
说明
-
需要先设置ak
-
当使用公网地址时,client的endpoint可以传空值
-
将withPublicEndpoint(boolean usePublicEndpoint) 放在最后一个设置
示例
import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通过环境变量获取认证信息
String akId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String akSecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
String region = System.getenv("PAIRC_REGION");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 创建客户端
RecallEngineClient client = new RecallEngineClient("", username, password);
// 设置公网调用参数
client.withAccessKeyId(akId)
.withAccessKeySecret(akSecret)
.withRegion(region)
.withInstanceId(instanceId)
.withPublicEndpoint(true)
.withRetryTimes(2);
召回数据
接口
public RecallResponse recall(RecallRequest request) throws RecallEngineException;
RecallRequest参数说明
|
参数 |
JSON属性名 |
类型 |
说明 |
|
instanceId |
instance_id |
String |
召回引擎实例 ID。 |
|
service |
service |
String |
召回服务名称。 |
|
version |
version |
String |
召回服务版本。 |
|
uid |
uid |
String |
用户 ID。 |
|
recalls |
recalls |
Map<String, RecallConf> |
召回配置集合。key 为召回名称,value 为召回配置。 |
|
requestId |
request_id |
String |
请求 ID(可选)。 |
|
exposureList |
exposure_list |
String |
曝光列表(可选),用于过滤已曝光的内容。 |
|
contextParams |
context_params |
Map<String, Object> |
上下文参数(可选),传递自定义的上下文信息。 |
|
debug |
debug |
boolean |
是否开启调试模式(可选),默认为 false。 |
|
retainFields |
retain_fields |
List<String> |
保留召回的字段(可选) |
RecallConf参数说明
|
参数 |
类型 |
说明 |
|
trigger |
String |
触发项,即召回的 trigger 值。 |
|
count |
int |
期望返回的召回结果数量。 |
RecallResponse说明
|
方法 |
返回类型 |
说明 |
|
getResult() |
Record |
获取召回结果记录集。 |
示例
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 构造召回请求
RecallRequest request = new RecallRequest();
request.setInstanceId("your-instance-id");
request.setService("recall_test");
request.setVersion("V1");
request.setUid("123");
request.setRetainFields(Arrays.asList("click"))
// 设置召回配置
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
request.setRecalls(recalls);
// 执行召回
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 获取结果信息
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());
写入数据
接口
public WriteResponse write(String instanceId, String table, WriteRequest request) throws RecallEngineException;
参数说明
|
参数 |
类型 |
说明 |
|
instanceId |
String |
召回引擎实例 ID。 |
|
table |
String |
目标表名称。 |
|
request |
WriteRequest |
写入请求对象。 |
WriteRequest参数说明
|
参数 |
JSON属性名 |
类型 |
说明 |
|
requestId |
request_id |
String |
请求 ID(可选)。 |
|
content |
content |
List<Map<String, Object>> |
写入的数据内容。每个 Map 代表一行数据,key 为字段名,value 为字段值。 |
|
versionId |
versionId |
String |
版本 ID(可选)。 |
|
insertMode |
insert_mode |
String |
写入类型,只能填“insert”或者“upsert”,“insert”是全量更新,“upsert”是部分字段更新 |
WriteResponse说明
WriteResponse 继承自 Response,包含以下字段:
|
方法 |
返回类型 |
说明 |
|
getRequestId() |
String |
请求 ID。 |
|
getCode() |
String |
状态码,成功时返回 "OK"。 |
|
getMessage() |
String |
响应消息。 |
|
getData() |
Map<String, Object> |
响应数据。 |
示例
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
//1. 插入包含所有字段的初始记录
WriteRequest insertRequest = new WriteRequest();
insertRequest.setRequestId("partial-update-init");
insertRequest.setInsertMode(InsertMode.INSERT);
Map<String, Object> initialData = new HashMap<>();
initialData.put("item_id", "item_partial_test_002");
initialData.put("bool_field", true);
initialData.put("category", "electronics");
initialData.put("list", java.util.Arrays.asList("tag1", "tag2", "tag3"));
java.util.List<Map<String, Object>> content = new java.util.ArrayList<>();
content.add(initialData);
insertRequest.setContent(content);
WriteResponse initResp = client.write(instanceId, tableName, insertRequest);
assertEquals("OK", initResp.getCode());
// 刷新以确保数据已写入
client.writeFlush();
System.out.println("Initial insert completed");
//2. 部分更新 — 仅更新 category(分类)字段
WriteRequest updateRequest = new WriteRequest();
updateRequest.setRequestId("partial-update-category");
updateRequest.setInsertMode(InsertMode.UPSERT); // 支持部分字段更新的 UPSERT 操作
Map<String, Object> partialData = new HashMap<>();
// 必须包含主键以标识记录
partialData.put("item_id", "item_partial_test_002");
// 仅更新 category 字段,其他字段(bool_field、list)保持不变
partialData.put("category", "home_appliances");
java.util.List<Map<String, Object>> updateContent = new java.util.ArrayList<>();
updateContent.add(partialData);
updateRequest.setContent(updateContent);
WriteResponse updateResp = client.write(instanceId, tableName, updateRequest);
assertEquals("OK", updateResp.getCode());
// 刷新以确保数据已写入
client.writeFlush();
System.out.println("Partial update completed: category updated to 'home_appliances'");
删除数据
接口
public DeleteResponse delete(String instanceId, String table, DeleteRequest request) throws RecallEngineException
参数说明
|
参数 |
类型 |
说明 |
|
instanceId |
String |
召回引擎实例 ID。 |
|
table |
String |
目标表名称。 |
|
request |
DeleteRequest |
删除请求对象。 |
DeleteRequest参数说明
|
参数 |
JSON属性名 |
类型 |
说明 |
|
requestId |
request_id |
String |
请求 ID(可选)。 |
|
keys |
keys |
List<String> |
删除的数据内容主键。 |
|
versionId |
version_id |
String |
版本 ID(可选)。 |
|
schema |
schema |
String |
数据表的模式/命名空间,如果不设置默认为“default” |
DeleteResponse
DeleteResponse 继承自 Response,包含以下字段:
|
方法 |
返回类型 |
说明 |
|
getRequestId() |
String |
请求 ID。 |
|
getCode() |
String |
状态码,成功时返回 "OK"。 |
|
getMessage() |
String |
响应消息。 |
|
getData() |
Map<String, Object> |
响应数据。 |
示例
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
String instanceId = "learn-pairec-cn-3ic4mhhvr01";
String tableName = "test_exposure1";
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.setRequestId("delete-version-test-001");
deleteRequest.setKeys(Arrays.asList("12345", "67890"));
deleteRequest.setSchema("default");
deleteRequest.setVersionId("20260327102603");
DeleteResponse deleteResp = client.delete(instanceId, tableName, deleteRequest);
System.out.println("Delete with version response: " + deleteResp.getCode() + ", " + deleteResp.getMessage());
操作召回结果(Record)
RecallResponse.getResult() 返回的 Record 对象提供了一系列链式操作方法,用于对召回结果进行排序、过滤、截断等处理。
接口
public Record sort(String name, boolean desc);
public Record retain(int count);
public Record filter(String name);
public Record filterByColumnValue(String name, Predicate<Object> predicate);
public Record filterByValues(Predicate<Map<String, Object>> predicate);
public Record random();
public List<Object> columnValues(String columnName);
public List<String> columnValuesString(String columnName);
public List<String> fieldNames();
public int size();
方法说明
|
方法 |
说明 |
|
sort(name, desc) |
按指定列排序。desc 为 true 时降序,false 时升序。 |
|
retain(count) |
保留前 count 条记录。 |
|
filter(name) |
按指定列进行去重。 |
|
filterByColumnValue(name, predicate) |
按指定列的值进行条件过滤,保留满足 predicate 的记录。 |
|
filterByValues(predicate) |
按全行数据进行条件过滤,predicate 接收每行的 Map<String, Object>。 |
|
random() |
将记录随机洗牌。 |
|
columnValues(columnName) |
获取指定列的所有值。 |
|
columnValuesString(columnName) |
获取指定列的所有非空字符串值。 |
|
fieldNames() |
获取所有字段名。 |
|
size() |
获取当前记录数量。 |
说明
-
sort、retain、filter、filterByColumnValue、filterByValues、random方法均返回this,支持链式调用。 -
Record不是线程安全的,请勿在多线程环境下并发操作同一个 Record 实例。
示例
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 按 score 列降序排序,去重后取前10条
Record processed = result
.sort("score", true)
.filter("item_id")
.retain(10);
System.out.println("Processed records: " + processed.size());
// 获取 item_id 列的所有值
List<String> itemIds = processed.columnValuesString("item_id");
System.out.println("Item IDs: " + itemIds);
// 按条件过滤:仅保留 score > 0.5 的记录
Record filtered = result.filterByColumnValue("score", value -> {
if (value instanceof Number) {
return ((Number) value).doubleValue() > 0.5;
}
return false;
});
// 按全行条件过滤
Record filtered2 = result.filterByValues(row -> {
Object category = row.get("category");
return category != null && "video".equals(category.toString());
});
完整示例
以下是一个完整的召回和写入操作示例:
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.*;
public class RecallEngineDemo {
public static void main(String[ ] args) throws RecallEngineException {
// 1. 初始化客户端
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
client.withRetryTimes(2)
String instanceId = System.getenv("INSTANCE_ID");
// 2. 写入数据
WriteRequest writeRequest = new WriteRequest();
writeRequest.setRequestId("write-req-001");
List<Map<String, Object>> content = new ArrayList<>();
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_456");
item.put("score", 0.95);
content.add(item);
writeRequest.setContent(content);
WriteResponse writeResp = client.write(instanceId, "u2i_table", writeRequest);
System.out.println("Write result: " + writeResp.getCode());
// 3. 召回数据
RecallRequest recallRequest = new RecallRequest();
recallRequest.setInstanceId(instanceId);
recallRequest.setService("recall_test");
recallRequest.setVersion("V1");
recallRequest.setUid("123");
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
recallRequest.setRecalls(recalls);
RecallResponse recallResp = client.recall(recallRequest);
Record result = recallResp.getResult();
// 4. 处理召回结果
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());
// 按 score 降序排序,取前10条
Record top10 = result.sort("score", true).retain(10);
System.out.println("Top 10 records: " + top10.toString());
// 获取 item_id 列表
List<String> itemIds = top10.columnValuesString("item_id");
System.out.println("Top 10 item IDs: " + itemIds);
// 5. 删除数据
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.setRequestId("delete-version-test-001");
deleteRequest.setKeys(Arrays.asList("123"));
deleteRequest.setSchema("default");
DeleteResponse deleteResp = client.delete(instanceId, tableName, deleteRequest);
System.out.println("Delete result: " + deleteResp.getCode());
}
}
异常处理
所有召回和写入操作都可能抛出 RecallEngineException,建议在调用时进行异常捕获处理:
try {
RecallResponse resp = client.recall(request);
// 处理结果
} catch (RecallEngineException e) {
System.err.println("Recall failed: " + e.getMessage());
}