集成Feature Generator到Java服务并执行特征变换。支持linux-x86_64和macosx-arm64。
使用限制
支持
linux-x86_64和macosx-arm64。
使用Java SDK
以Maven项目为例:
1. 下载FG SDK jar包到本地,如/path/to/feature_generator-${version}-${platform}.jar:
platform | FG SDK Jar package |
linux-x86_64 | |
macosx-arm64 |
注意:FG默认使用std::hash做hash分箱。不同平台C++标准库中std::hash实现不同,分箱结果不一致。
如需跨平台一致的分箱结果,设置环境变量USE_FARM_HASH_TO_BUCKETIZE=true。详见FG全局配置文档。
2. 使用mvn install:install-file安装到本地仓库:
mvn install:install-file \
-Dfile=/path/to/feature_generator-${version}-${platform}.jar \
-DgroupId=com.aliyun.pai \
-DartifactId=feature_generator \
-Dversion=${version} \
-Dclassifier=${platform} \
-Dpackaging=jar替换${version}为实际版本号,${platform}为实际平台名称。
3. 在pom.xml中添加依赖:
<dependencies>
<dependency>
<groupId>com.aliyun.pai</groupId>
<artifactId>feature_generator</artifactId>
<version>${version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
</dependencies>4. 创建fg.json文件:
示例:
{
"features": [
{
"feature_name": "query_word",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:query_word",
"default_value": "",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
},
{
"feature_name": "query_match",
"feature_type": "lookup_feature",
"map": "user:query_token",
"key": "item:title",
"needDiscrete": false,
"needKey": false,
"default_value": "0",
"combiner": "sum",
"need_prefix": false,
"value_type": "double"
},
{
"feature_name": "goods_id",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:goods_id",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "filter_type",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:filter_type",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "day_h",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:day_h",
"default_value": "0",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "week_day",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:week_day",
"default_value": "0",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "city",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:city",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "province",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:province",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "country",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:country",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "is_new_user",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:is_new_user",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "focus_author",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:focus_author",
"separator": ",",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
},
{
"feature_name": "title",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:title",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
}
],
"reserves": [
"request_id",
"user_id",
"is_click",
"is_pay"
]
}5. 参考如下代码使用Java API:
package org.example;
import com.aliyun.pai.fg.*;
import java.net.URL;
import java.util.*;
public class Main {
public static void main(String[] args) {
String filePath = "/path/to/fg.json";
FgHandler handler = new FgHandler(filePath, 4, false);
List<String> outputs = new ArrayList<>();
outputs.add("goods_id");
outputs.add("is_new_user");
outputs.add("day_h");
outputs.add("query_match"); // set output feature_name
outputs.add("title");
outputs.add("filter_type");
outputs.add("city");
outputs.add("province");
outputs.add("country");
outputs.add("focus_author");
handler.setOutputs(outputs.toArray(new String[0]));
List<String> expectGoods = Arrays.asList("218687106", "1142068348", "1142068347");
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putOptionalString("goods_id", expectGoods)
.putOptionalInt32("is_new_user", Arrays.asList(0, 1, 0))
.putOptionalInt32("day_h", Arrays.asList(6, 8, 11))
.putListString("title", Arrays.asList(
Arrays.asList("k2", "k3", "k5"),
Arrays.asList("k1", "k2", "k3"),
Arrays.asList("k2", "k4")))
.putMapStringFloat("query_token", Arrays.asList(
new HashMap<String, Float>() {{
put("k2", 0.8f);
put("k3", 0.5f);
}},
new HashMap<String, Float>() {{
put("k1", 0.9f);
put("k4", 0.2f);
}},
new HashMap<String, Float>() {{
put("k1", 0.7f);
put("k2", 0.3f);
put("k4", 0.6f);
}}))
.putOptionalString("filter_type", Arrays.asList(null, "f1", null))
.putOptionalString("city", Arrays.asList("hangzhou"))
.putOptionalString("province", Arrays.asList("zhejiang"))
.putOptionalString("country", Arrays.asList("china"))
.putOptionalString("focus_author", Arrays.asList("2255010511022,14164467", "10511022,24164467", "550105110,34164467"))
.build();
VariantVectorMap results = handler.Process(inputs);
if (results == null || results.isNull()) {
System.out.println("fg result is null");
return;
}
inputs.close(); // 释放native内存
System.out.println("result size=" + results.size());
List<String> features = results.getKeys();
System.out.println("result features=" + features);
List<String> goodsIds = results.getList("goods_id");
System.out.println("goods_ids=" + String.join(", ", goodsIds));
List<List<String>> titles = results.getList("title");
System.out.println("titles=" + titles);
List<Long> dayHours = results.getList("day_h");
System.out.println("day_h=" + dayHours);
List<String> filters = results.getList("filter_type");
System.out.println("filter_type=" + String.join(", ", filters));
List<List<String>> focus = results.getList("focus_author");
System.out.println("focus_author=" + focus);
List<String> citys = results.getList("city");
System.out.println("city=" + String.join(", ", citys));
List<String> provinces = results.getList("province");
System.out.println("provinces=" + String.join(", ", provinces));
List<String> countrys = results.getList("country");
System.out.println("country=" + String.join(", ", countrys));
List<Long> isNewUsers = results.getList("is_new_user");
System.out.println("is_new_user=" + isNewUsers);
List<Double> queryMatch = results.getList("query_match");
System.out.println("query_match=" + queryMatch);
System.out.println("===========================================================");
Set<String> itemInputs = handler.GetItemInputNames();
System.out.println("item side inputs=" + itemInputs);
Set<String> userInputs = handler.GetUserInputNames();
System.out.println("user side inputs=" + userInputs);
Set<String> ctxInputs = handler.GetContextInputNames();
System.out.println("context side inputs=" + ctxInputs);
Set<String> reserved = handler.GetReserveColumns();
System.out.println("reserved columns =" + reserved);
Set<String> qminputs = handler.GetFeatureInputs("query_match");
System.out.println("inputs of query_match =" + qminputs);
String defaultVal = handler.DefaultFeatureValue("query_match");
System.out.println("default feature value of query_match =" + defaultVal);
List<String> allFeatures = handler.GetAllFeatureNames();
System.out.println("all feature names:" + allFeatures);
Map<String, String> schema = handler.GetTableSchema();
System.out.println("table schema:" + schema);
results.close(); // 释放native内存
handler.close(); // 释放资源
}
}使用文档
FgHandler类
FgHandler根据配置(config_json)处理输入特征并输出结果:
主要功能包括:
通过配置初始化特征处理器,可指定线程数、是否只做bucketize。
处理输入特征:
Process(VariantVectorMap inputs) -> VariantVectorMap设置输出特征集合:
setOutputs(...)查询配置中的输入/输出/特征元信息(feature names、schema、默认值、维度)。
显式释放native资源:
close()(必须调用)
快速开始
初始化
四种构造方式(都会调用native allocate):
// 1) 仅配置
FgHandler handler = new FgHandler(configJsonOrPath);
// 2) 配置 + 线程数
FgHandler handler = new FgHandler(configJsonOrPath, threadNum);
// 3) 配置 + 线程数 + 是否仅 bucketize
FgHandler handler = new FgHandler(configJsonOrPath, threadNum, bucketizeOnly);
// 4) 配置 + 线程数 + 是否仅 bucketize + config 是否为文件路径
FgHandler handler = new FgHandler(configJsonOrPath, threadNum, bucketizeOnly, isCfgPath);参数:
config_json:配置内容或配置路径(由is_cfg_path决定)
thread_num:native内部处理线程数(size_t)
bucketize_only:是否只执行bucketize(可通过IsOnlyBucketize()查询)
is_cfg_path:true表示config_json参数是配置文件路径,false表示配置JSON文本
设置输出特征
两种设置方式:
handler.setOutputs("feature_a", "feature_b");
// 或者使用 String[] 数组作为参数
List<String> outputs = new ArrayList<>();
outputs.add("goods_id");
outputs.add("is_new_user");
outputs.add("title");
outputs.add("city");
outputs.add("province");
outputs.add("country");
handler.setOutputs(outputs.toArray(new String[0]));或使用底层接口:
StrVector outs = new StrVector();
outs.push_back("feature_a");
outs.push_back("feature_b");
handler.setOutputs(outs);不设置输出时,默认输出所有配置的特征。
执行处理
VariantVectorMap inputs = new VariantVectorMap();
// TODO: 按你的 VariantVectorMap 约定填充 inputs
VariantVectorMap outputs = handler.Process(inputs);
if (outputs == null || outputs.isNull()) {
System.out.println("fg output is null");
inputs.close();
return;
}
// TODO: 从 outputs 读取结果FgHandler.Process(inputs)批量处理样本(batch),输入输出类型都是VariantVectorMap:
VariantVectorMap: Map<String, VariantVector>(key为字段/特征名)
VariantVector: 列存储的批数据容器(列向量),支持多种嵌套形态:
Optional标量: List<T>(长度=batch size, 可包含null)
List序列: List<List<T>>(外层长度=batch size)
Map特征: List<Map<K,V>>(外层长度=batch size)
Matrix特征: List<List<List<T>>>(外层长度=batch size, 每个样本一个二维矩阵)
Process函数返回结果可能为null(表示特征变换失败),使用前必须检查。
输入与输出数据结构
VariantVectorMap(输入/输出容器)
基本API
构造:
new VariantVectorMap()
VariantVectorMap.fromJavaMap(Map<String, VariantVector>)
new VariantVectorMap.Builder()...build()
写入:
put(String key, VariantVector value)
putAll(Map<String, VariantVector>)
读取:
VariantVector get(String key)
boolean contains(String key)
List<String> getKeys()
long size()
Map<String, VariantVector> toJavaMap()(转换为Java Map)
释放:
clear()清空元素
close() 释放native内存(必须调用,否则内存泄漏)
Builder构造inputs(推荐)
VariantVectorMap.Builder提供多种putXXX方法:
标量optional(对应一个batch):
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putOptionalInt64("user_id", Arrays.asList(1001L, 1002L))
.putOptionalString("item_id", Arrays.asList("i1", "i2"))
.putOptionalDouble("score", Arrays.asList(0.1, null)) // 支持 null
.build();单值(batch=1):
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putSingleOptionalInt64("user_id", 1001L)
.putSingleOptionalString("item_id", "i1")
.build();序列(List类型, batch内每个样本一个list):
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putListInt64("clicked_item_ids",
Arrays.asList(
Arrays.asList(11L, 12L),
Arrays.asList(21L)
)
)
.build();Map特征(batch内每个样本一个map):
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putMapStringFloat("user_dense",
Arrays.asList(
Map.of("age", 18.0f, "lvl", 3.0f),
Map.of("age", 25.0f)
)
)
.build();Matrix(每个样本一个二维矩阵):
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putMatrixFloat("image_emb",
Arrays.asList(
Arrays.asList( // sample1 matrix
Arrays.asList(0.1f, 0.2f),
Arrays.asList(0.3f, 0.4f)
),
Arrays.asList( // sample2 matrix
Arrays.asList(1.0f, 2.0f)
)
)
)
.build();getList: 类型安全读取(输出侧常用)
VariantVectorMap.getList(key)根据VariantVector.getType()自动选择注册的TypeReference做转换。
示例: 读取OPTIONAL_FLOAT类型的feature_x:
List<Float> xs = out.getList("feature_x"); // 实际返回 List<Float>读取LIST_INT64(返回List<List>):
List<List<Long>> seq = out.getList("seq_feature");注意:
getList返回类型是<T> List<T>, 编译期无法强约束。类型不匹配时运行时抛IllegalArgumentException("Type mismatch...")。
TYPE_REGISTRY已预注册所有定义的类型常量到TypeReference的映射。
建议: 先获取输出表Schema信息,再根据schema确定getList返回值类型。
Map<String, String> schema = handler.GetTableSchema();
System.out.println("table schema:" + schema);VariantVector(列向量与类型系统)
VariantVector是单列数据,必须设置type并按该类型组织内部数据。
支持的类型
Optional(允许null, 外层即batch):
OPTIONAL_STRING / INT32 / INT64 / FLOAT / DOUBLE
Java表示: List<T>(长度=batch, 元素可为null)
List(每条样本一段序列):
LIST_STRING / INT32 / INT64 / FLOAT / DOUBLE
Java表示: List<List<T>>(外层长度=batch)
Map(每条样本一个map):
MAP_STRING_*: List<Map<String, V>>
MAP_INT32_*: List<Map<Integer, V>>
MAP_INT64_*: List<Map<Long, V>>
Matrix(每条样本一个2D矩阵):
MATRIX_FLOAT / MATRIX_INT64 / MATRIX_STRING
Java表示: List<List<List<T>>>
构造方式: 静态方法fromXXX(推荐)
无需手写底层Builder(type).withXXXData(),直接使用:
VariantVector.fromOptionalInt64(List<Long> values)
VariantVector.fromListFloat(List<List<Float>> values)
VariantVector.fromMapStringInt32(List<Map<String,Integer>> maps)
VariantVector.fromMatrixFloat(List<List<List<Float>>> matrices) 等。
示例(手动构造后放入map):
VariantVector userIds = VariantVector.fromOptionalInt64(Arrays.asList(1L, 2L, null));
VariantVectorMap inputs = new VariantVectorMap();
inputs.put("user_id", userIds);通过此方式构建的对象拥有自己的native内存,使用完后必须调用close()释放。
借用内存(borrowed)模式的对象:
VariantVectorMap results;
// build results
VariantVector v = results.get("goods_id");此类型对象本质是视图,底层内存与VariantVectorMap对象共享。VariantVectorMap释放后,该对象变为”悬空”状态,不可再使用。
borrowed类型的VariantVector对象使用完后无需调用close(),不要单独长期保存,生命周期依赖所属的VariantVectorMap对象。
读取: toXXXList
获取VariantVector vv后,按类型调用转换方法:
VariantVector vv = out.get("feature_x");
if (vv.getType() == VariantVector.OPTIONAL_DOUBLE) {
List<Double> vals = vv.toOptionalDoubleList();
}或使用VariantVectorMap.getList(key)自动分发(见上文3.1.3)。
辅助方法: size / isEmpty / totalElementCount
vv.size():
Optional: 返回batch size(nullFlags长度)
其他: 返回sizes[0](外层元素数, 通常=batch size)
vv.isEmpty(): 根据type检查底层数据是否为空
vv.totalElementCount(): 计算list/map/matrix嵌套展开后的总元素量(用于调试/检查)
元信息查询接口
用于了解配置内容与特征依赖。
查询输入字段集合
Set<String> itemInputs = handler.GetItemInputNames();
Set<String> userInputs = handler.GetUserInputNames();
Set<String> ctxInputs = handler.GetContextInputNames();查询所有特征名/保留列/特殊集合
List<String> allFeatures = handler.GetAllFeatureNames();
Set<String> reserveCols = handler.GetReserveColumns();
Set<String> userSide = handler.GetUserSideFeatures();
Set<String> seqFeatures = handler.GetSequenceFeatures();查询特征依赖与默认值/维度/bucketizer
Set<String> deps = handler.GetFeatureInputs("feature_x");
String defaultVal = handler.DefaultFeatureValue("feature_x");
long defaultBucket = handler.DefaultBucketizedFeatureValue("feature_x");
long dim = handler.GetFeatureValueDim("feature_x");
boolean hasBucketizer = handler.HasBucketizer("feature_x");
boolean onlyBucketize = handler.IsOnlyBucketize();适用场景:
生成输入准备逻辑(确定feature需要哪些raw inputs)
输入缺失时进行补齐/校验(默认值)
下游模型需要固定维度时(dim)
决定是否需要额外bucketize处理(hasBucketizer/onlyBucketize)
查询MC输出表结构(Schema)
使用方式:
Map<String, String> schema = handler.GetTableSchema();
String type = schema.get("feature_x");资源释放(必读)
FgHandler持有native侧对象指针,不用时必须释放,否则造成内存泄漏。
重量级对象,通常:
初始化一次
重复复用
服务退出时close()
正确用法:
不要每个请求都new FgHandler()
一个配置对应一个长期复用的handler
FgHandler不再使用时,必须调用close()释放资源。该函数特性:
线程安全: synchronized
幂等: 重复调用不会重复释放
close()后不可再调用任何native方法(包括Process、查询接口等),否则可能崩溃(use-after-free)。
FgHandler handler = new FgHandler(cfg, 4);
try {
...
} finally {
handler.close();
}调试阶段使用try-with-resources:
try (FgHandler handler = new FgHandler(cfg, 4)) {
handler.setOutputs("f1", "f2");
VariantVectorMap out = handler.Process(in);
}生产环境不要频繁创建相同的FgHandler对象
服务内部使用FgHandler时,不要每次请求都创建FgHandler对象。该对象初始化开销大,频繁创建影响服务性能。
在服务初始化函数里创建全局对象,在服务退出函数里调用close()。创建多个不同的FgHandler对象时,遵循相同模式。
推荐的请求处理模式:
模式A: 同步请求
public Result handle(Request req, FgHandler handler) {
try (VariantVectorMap inputs = buildInputs(req);
VariantVectorMap results = handler.Process(inputs)) {
Result r = new Result();
r.goodsIds = results.get("goods_id").toOptionalStringList();
r.dayHs = results.get("day_h").toOptionalInt32List();
r.titles = results.get("title").toListStringList();
return r;
}
}特点:
native资源作用域清晰
请求结束即释放
不易OOM
模式B: 服务级单例handler
public class FgService implements AutoCloseable {
private final FgHandler handler;
public FgService(String configPath) {
this.handler = new FgHandler(configPath, 4, false, true);
}
public Result process(Request req) {
try (VariantVectorMap inputs = buildInputs(req);
VariantVectorMap results = handler.Process(inputs)) {
return decode(results);
}
}
@Override
public void close() {
handler.close();
}
}生产环境最推荐的方式。
常见用法示例
处理前根据配置准备输入字段:
FgHandler handler = new FgHandler(cfgJson, 8, false, false);
try {
// 1) 设置需要的输出
handler.setOutputs("feature_a", "feature_b");
// 2) 了解需要准备哪些输入
Set<String> itemInputs = handler.GetItemInputNames();
Set<String> userInputs = handler.GetUserInputNames();
Set<String> ctxInputs = handler.GetContextInputNames();
// 3) 构造 inputs(示意,具体取决于 VariantVectorMap 的 API)
VariantVectorMap inputs = new VariantVectorMap();
// inputs.put("user_id", ...)
// inputs.put("item_id", ...)
// inputs.put("ts", ...)
VariantVectorMap outputs = handler.Process(inputs);
inputs.close();
// 读取 outputs...
outputs.close();
} finally {
handler.close();
}只做bucketize:
FgHandler handler = new FgHandler(cfgJson, 4, true, false);
try {
if (!handler.IsOnlyBucketize()) {
// 配置/构造参数不匹配时可做提示
}
...
} finally {
handler.close();
}安全的标准用法:
try (FgHandler handler = new FgHandler(configPath, 4, false, true)) {
handler.setOutputs("goods_id", "day_h", "title");
try (VariantVectorMap inputs = new VariantVectorMap.Builder()
.putOptionalString("goods_id", Arrays.asList("1", "2"))
.putOptionalInt32("day_h", Arrays.asList(6, 8))
.putListString("title", Arrays.asList(
Arrays.asList("a", "b"),
Arrays.asList("c", "d")))
.build();
VariantVectorMap results = handler.Process(inputs)) {
List<String> goodsIds = results.get("goods_id").toOptionalStringList();
List<Integer> dayHs = results.get("day_h").toOptionalInt32List();
List<List<String>> titles = results.get("title").toListStringList();
// 用完即止,不把 results / borrowed VariantVector 长时间带出作用域
}
}易导致OOM的用法
1. 未close VariantVectorMap
错误示例:
for (...) {
VariantVectorMap inputs = new VariantVectorMap.Builder()...build();
VariantVectorMap results = handler.Process(inputs);
// 没有 close
}导致问题:
Java对象被GC回收
native内存未及时释放
高QPS下大量对象未释放, 最终OOM或native RSS爆掉
正确示例:
for (...) {
try (VariantVectorMap inputs = new VariantVectorMap.Builder()...build();
VariantVectorMap results = handler.Process(inputs)) {
...
}
}try-with-resources模式下自动释放资源。
2. 每个请求都new FgHandler
错误示例:
for (...) {
FgHandler handler = new FgHandler(configPath, 4, false, true);
...
}开销大:
反复加载配置
初始化线程池/内部结构
native资源重复申请
正确示例:
FgHandler handler = new FgHandler(configPath, 4, false, true);
try {
for (...) {
...
}
} finally {
handler.close();
}3. 缓存results.get(key)返回值
错误示例:
VariantVector v = results.get("goods_id");
cache.add(v); // 错borrowed类型问题:
results.close()后对象失效不close
results则内存一直不释放
正确示例:
立即解码为普通Java对象:
List<String> goodsIds = results.get("goods_id").toOptionalStringList();
cache.add(goodsIds);或使用getCopy()保存native对象并负责后续close:
VariantVector copy = results.getCopy("goods_id");
try {
...
} finally {
copy.close();
}不建议缓存native copy,除非必要。
4. 大量使用toJavaMap()后不释放value
toJavaMap()返回owned copy,易制造大量native对象。
错误示例:
Map<String, VariantVector> map = results.toJavaMap();
// 放着不关正确示例:
除非必要,不要使用。
推荐直接按字段读取为Java对象。
必须使用时:
Map<String, VariantVector> map = results.toJavaMap();
try {
...
} finally {
for (VariantVector v : map.values()) {
if (v != null) v.close();
}
}5. Builder生成的中间VariantVector长期留在Java Map中
例如:
VariantVectorMap.Builder builder = new VariantVectorMap.Builder();
builder.putOptionalString(...);
builder.putOptionalInt32(...);
// builder/data 长期存在Builder内部持有多个VariantVector(owned native对象)。
Builder长期不释放会占用native内存。
正确用法:
Builder生命周期尽量短,只用于构建一次请求。