Java SDK 参考

更新时间:
复制为 MD 格式

通过Java SDK使用召回引擎(RecallEngine)

本文介绍如何使用Java SDK调用召回引擎(RecallEngine)进行数据召回和数据写入操作。

前提条件

  • 已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。

  • 已在召回引擎中创建好目标表和召回服务(Service)。

安装Java SDK

在 Maven 项目的 pom.xml 中添加以下依赖:

<dependency>
  <groupId>com.aliyun.openservices.aiservice</groupId>
  <artifactId>pairec-sdk</artifactId>
  <version>1.0.7</version>
</dependency>

初始化RecallEngine客户端

接口

RecallEngineClient client = new RecallEngineClient(endpoint, username, password);

参数说明

参数

示例

说明

endpoint

http://ep-xxxxxx.aliyuncs.com

召回引擎服务的访问地址。

username

System.getenv("RECALL_ENGINE_SERVICE_USERNAME")

通过环境变量获取认证用户名。

password

System.getenv("RECALL_ENGINE_SERVICE_PASSWORD")

通过环境变量获取认证密码。

召回引擎服务的访问地址可以在PAI-Rec管控台 召回管理基本信息页面查看。

{77ABF890-68EE-49E5-93F5-02CF998BB9C4}

可选配置方法

方法

说明

withRetryTimes(int retryTimes)

设置请求失败时的重试次数,默认为0(不重试)。

withHttpClient(OkHttpClient httpClient)

设置自定义的 OkHttpClient 实例,用于自定义连接池、超时等参数。

说明

  • 客户端默认连接超时为200ms,读写超时为500ms,连接池最大连接数为1000。

  • 所有可选配置方法支持链式调用。

示例

import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;

// 通过环境变量获取认证信息
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");

// 创建客户端
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);

// 设置重试次数
client.withRetryTimes(2);

召回数据

接口

public RecallResponse recall(RecallRequest request) throws RecallEngineException;

RecallRequest参数说明

参数

JSON属性名

类型

说明

instanceId

instance_id

String

召回引擎实例 ID。

service

service

String

召回服务名称。

version

version

String

召回服务版本。

uid

uid

String

用户 ID。

recalls

recalls

Map<String, RecallConf>

召回配置集合。key 为召回名称,value 为召回配置。

requestId

request_id

String

请求 ID(可选)。

exposureList

exposure_list

String

曝光列表(可选),用于过滤已曝光的内容。

contextParams

context_params

Map<String, Object>

上下文参数(可选),传递自定义的上下文信息。

debug

debug

boolean

是否开启调试模式(可选),默认为 false。

RecallConf参数说明

参数

类型

说明

trigger

String

触发项,即召回的 trigger 值。

count

int

期望返回的召回结果数量。

RecallResponse说明

方法

返回类型

说明

getResult()

Record

获取召回结果记录集。

示例

import com.aliyun.openservices.pairec.recallengine.*;

import java.util.HashMap;
import java.util.Map;

// 构造召回请求
RecallRequest request = new RecallRequest();
request.setInstanceId("your-instance-id");
request.setService("recall_test");
request.setVersion("V1");
request.setUid("123");

// 设置召回配置
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
request.setRecalls(recalls);

// 执行召回
RecallResponse resp = client.recall(request);
Record result = resp.getResult();

// 获取结果信息
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());

写入数据

接口

public WriteResponse write(String instanceId, String table, WriteRequest request) throws RecallEngineException;

参数说明

参数

类型

说明

instanceId

String

召回引擎实例 ID。

table

String

目标表名称。

request

WriteRequest

写入请求对象。

WriteRequest参数说明

参数

JSON属性名

类型

说明

requestId

request_id

String

请求 ID(可选)。

content

content

List<Map<String, Object>>

写入的数据内容。每个 Map 代表一行数据,key 为字段名,value 为字段值。

versionId

versionId

String

版本 ID(可选)。

WriteResponse说明

WriteResponse 继承自 Response,包含以下字段:

方法

返回类型

说明

getRequestId()

String

请求 ID。

getCode()

String

状态码,成功时返回 "OK"。

getMessage()

String

响应消息。

getData()

Map<String, Object>

响应数据。

示例

import com.aliyun.openservices.pairec.recallengine.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// 构造写入请求
WriteRequest request = new WriteRequest();
request.setRequestId("write-req-123");

// 构造写入数据
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_123");
item.put("score", 0.95);

List<Map<String, Object>> content = new ArrayList<>();
content.add(item);
request.setContent(content);

// 执行写入
WriteResponse resp = client.write("your-instance-id", "u2i_table", request);

// 检查写入结果
System.out.println("Code: " + resp.getCode());       // 成功时返回 "OK"
System.out.println("Message: " + resp.getMessage());

操作召回结果(Record)

RecallResponse.getResult() 返回的 Record 对象提供了一系列链式操作方法,用于对召回结果进行排序、过滤、截断等处理。

接口

public Record sort(String name, boolean desc);
public Record retain(int count);
public Record filter(String name);
public Record filterByColumnValue(String name, Predicate<Object> predicate);
public Record filterByValues(Predicate<Map<String, Object>> predicate);
public Record random();
public List<Object> columnValues(String columnName);
public List<String> columnValuesString(String columnName);
public List<String> fieldNames();
public int size();

方法说明

方法

说明

sort(name, desc)

按指定列排序。desc 为 true 时降序,false 时升序。

retain(count)

保留前 count 条记录。

filter(name)

按指定列进行去重。

filterByColumnValue(name, predicate)

按指定列的值进行条件过滤,保留满足 predicate 的记录。

filterByValues(predicate)

按全行数据进行条件过滤,predicate 接收每行的 Map<String, Object>。

random()

将记录随机洗牌。

columnValues(columnName)

获取指定列的所有值。

columnValuesString(columnName)

获取指定列的所有非空字符串值。

fieldNames()

获取所有字段名。

size()

获取当前记录数量。

说明

  • sortretainfilterfilterByColumnValuefilterByValuesrandom 方法均返回 this,支持链式调用。

  • Record 不是线程安全的,请勿在多线程环境下并发操作同一个 Record 实例。

示例

RecallResponse resp = client.recall(request);
Record result = resp.getResult();

// 按 score 列降序排序,去重后取前10条
Record processed = result
        .sort("score", true)
        .filter("item_id")
        .retain(10);

System.out.println("Processed records: " + processed.size());

// 获取 item_id 列的所有值
List<String> itemIds = processed.columnValuesString("item_id");
System.out.println("Item IDs: " + itemIds);

// 按条件过滤:仅保留 score > 0.5 的记录
Record filtered = result.filterByColumnValue("score", value -> {
    if (value instanceof Number) {
        return ((Number) value).doubleValue() > 0.5;
    }
    return false;
});

// 按全行条件过滤
Record filtered2 = result.filterByValues(row -> {
    Object category = row.get("category");
    return category != null && "video".equals(category.toString());
});

完整示例

以下是一个完整的召回和写入操作示例:

import com.aliyun.openservices.pairec.recallengine.*;

import java.util.*;

public class RecallEngineDemo {

    public static void main(String[ ] args) throws RecallEngineException {

        // 1. 初始化客户端
        String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
        String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
        String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");

        RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
        client.withRetryTimes(2)

        String instanceId = System.getenv("INSTANCE_ID");

        // 2. 写入数据
        WriteRequest writeRequest = new WriteRequest();
        writeRequest.setRequestId("write-req-001");

        List<Map<String, Object>> content = new ArrayList<>();
        Map<String, Object> item = new HashMap<>();
        item.put("user_id", "123");
        item.put("item_id", "item_456");
        item.put("score", 0.95);
        content.add(item);
        writeRequest.setContent(content);

        WriteResponse writeResp = client.write(instanceId, "u2i_table", writeRequest);
        System.out.println("Write result: " + writeResp.getCode());

        // 3. 召回数据
        RecallRequest recallRequest = new RecallRequest();
        recallRequest.setInstanceId(instanceId);
        recallRequest.setService("recall_test");
        recallRequest.setVersion("V1");
        recallRequest.setUid("123");

        Map<String, RecallConf> recalls = new HashMap<>();
        recalls.put("u2i_recall", new RecallConf("123", 100));
        recallRequest.setRecalls(recalls);

        RecallResponse recallResp = client.recall(recallRequest);
        Record result = recallResp.getResult();

        // 4. 处理召回结果
        System.out.println("Total records: " + result.size());
        System.out.println("Field names: " + result.fieldNames());

        // 按 score 降序排序,取前10条
        Record top10 = result.sort("score", true).retain(10);
        System.out.println("Top 10 records: " + top10.toString());

        // 获取 item_id 列表
        List<String> itemIds = top10.columnValuesString("item_id");
        System.out.println("Top 10 item IDs: " + itemIds);
    }
}

异常处理

所有召回和写入操作都可能抛出 RecallEngineException,建议在调用时进行异常捕获处理:

try {
    RecallResponse resp = client.recall(request);
    // 处理结果
} catch (RecallEngineException e) {
    System.err.println("Recall failed: " + e.getMessage());
}