本文介绍Java SDK的使用;同时介绍如何在MaxCompute平台上,通过MapReduce Job离线批量生成特征,可以根据配置文件和命令行参数决定是否对生成的特征做分箱操作。
使用限制
MapReduce不支持ODPS2.0的复杂类型(list、map、float、int等)。
MapReduce版本的FG已停止维护,请使用PyFG代替。
Java SDK 目前只支持在Linux x86_64平台上运行。
使用Java SDK
以Maven项目为例,介绍如何使用Java SDK。
1. 下载jar包到本地位置 /path/to/feature_generator-${version}-linux-x86_64.jar。
2. 使用mvn install:install-file安装到本地仓库
mvn install:install-file \
-Dfile=/path/to/feature_generator-${version}-linux-x86_64.jar \
-DgroupId=com.aliyun.pai \
-DartifactId=feature_generator \
-Dversion=${version} \
-Dclassifier=linux-x86_64 \
-Dpackaging=jar注意:需要替换上述命令中的${version}为实际的版本号
3. 在你的pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>com.aliyun.pai</groupId>
<artifactId>feature_generator</artifactId>
<version>${version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
</dependencies>4. 创建fg.json文件
Demo示例:
{
"features": [
{
"feature_name": "query_word",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:query_word",
"default_value": "",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
},
{
"feature_name": "query_match",
"feature_type": "lookup_feature",
"map": "user:query_token",
"key": "item:title",
"needDiscrete": false,
"needKey": false,
"default_value": "0",
"combiner": "sum",
"need_prefix": false,
"value_type": "double"
},
{
"feature_name": "goods_id",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:goods_id",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "filter_type",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:filter_type",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "day_h",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:day_h",
"default_value": "0",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "week_day",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:week_day",
"default_value": "0",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "city",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:city",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "province",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:province",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "country",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:country",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "is_new_user",
"feature_type": "id_feature",
"value_type": "int64",
"expression": "user:is_new_user",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"value_dimension": 1
},
{
"feature_name": "focus_author",
"feature_type": "id_feature",
"value_type": "String",
"expression": "user:focus_author",
"separator": ",",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
},
{
"feature_name": "title",
"feature_type": "id_feature",
"value_type": "String",
"expression": "item:title",
"default_value": "-1024",
"combiner": "mean",
"need_prefix": false,
"is_multi": true
}
],
"reserves": [
"request_id",
"user_id",
"is_click",
"is_pay"
]
}5. 参考如下代码使用Java API
package org.example;
import com.aliyun.pai.fg.*;
import java.net.URL;
import java.util.*;
public class Main {
public static void main(String[] args) {
String filePath = "/path/to/fg.json";
FgHandler handler = new FgHandler(filePath, 4, false);
List<String> outputs = new ArrayList<>();
outputs.add("goods_id");
outputs.add("is_new_user");
outputs.add("day_h");
outputs.add("query_match"); // set output feature_name
outputs.add("title");
outputs.add("filter_type");
outputs.add("city");
outputs.add("province");
outputs.add("country");
outputs.add("focus_author");
handler.setOutputs(outputs.toArray(new String[0]));
List<String> expectGoods = Arrays.asList("218687106", "1142068348", "1142068347");
VariantVectorMap inputs = new VariantVectorMap.Builder()
.putOptionalString("goods_id", expectGoods)
.putOptionalInt32("is_new_user", Arrays.asList(0, 1, 0))
.putOptionalInt32("day_h", Arrays.asList(6, 8, 11))
.putListString("title", Arrays.asList(
Arrays.asList("k2", "k3", "k5"),
Arrays.asList("k1", "k2", "k3"),
Arrays.asList("k2", "k4")))
.putMapStringFloat("query_token", Arrays.asList(
new HashMap<String, Float>() {{
put("k2", 0.8f);
put("k3", 0.5f);
}},
new HashMap<String, Float>() {{
put("k1", 0.9f);
put("k4", 0.2f);
}},
new HashMap<String, Float>() {{
put("k1", 0.7f);
put("k2", 0.3f);
put("k4", 0.6f);
}}))
.putOptionalString("filter_type", Arrays.asList(null, "f1", null))
.putOptionalString("city", Arrays.asList("hangzhou"))
.putOptionalString("province", Arrays.asList("zhejiang"))
.putOptionalString("country", Arrays.asList("china"))
.putOptionalString("focus_author", Arrays.asList("2255010511022,14164467", "10511022,24164467", "550105110,34164467"))
.build();
VariantVectorMap results = handler.Process(inputs);
if (results == null || results.isNull()) {
System.out.println("fg result is null");
return;
}
System.out.println("result size=" + results.size());
List<String> features = results.getKeys();
System.out.println("result features=" + features);
List<String> goodsIds = results.getList("goods_id");
System.out.println("goods_ids=" + String.join(", ", goodsIds));
List<List<String>> titles = results.getList("title");
System.out.println("titles=" + titles);
List<Long> dayHours = results.getList("day_h");
System.out.println("day_h=" + dayHours);
List<String> filters = results.getList("filter_type");
System.out.println("filter_type=" + String.join(", ", filters));
List<List<String>> focus = results.getList("focus_author");
System.out.println("focus_author=" + focus);
List<String> citys = results.getList("city");
System.out.println("city=" + String.join(", ", citys));
List<String> provinces = results.getList("province");
System.out.println("provinces=" + String.join(", ", provinces));
List<String> countrys = results.getList("country");
System.out.println("country=" + String.join(", ", countrys));
List<Long> isNewUsers = results.getList("is_new_user");
System.out.println("is_new_user=" + isNewUsers);
List<Double> queryMatch = results.getList("query_match");
System.out.println("query_match=" + queryMatch);
System.out.println("===========================================================");
Set<String> itemInputs = handler.GetItemInputNames();
System.out.println("item side inputs=" + itemInputs);
Set<String> userInputs = handler.GetUserInputNames();
System.out.println("user side inputs=" + userInputs);
Set<String> ctxInputs = handler.GetContextInputNames();
System.out.println("context side inputs=" + ctxInputs);
Set<String> reserved = handler.GetReserveColumns();
System.out.println("reserved columns =" + reserved);
Set<String> qminputs = handler.GetFeatureInputs("query_match");
System.out.println("inputs of query_match =" + qminputs);
String defaultVal = handler.DefaultFeatureValue("query_match");
System.out.println("default feature value of query_match =" + defaultVal);
List<String> allFeatures = handler.GetAllFeatureNames();
System.out.println("all feature names:" + allFeatures);
Map<String, String> schema = handler.GetTableSchema();
System.out.println("table schema:" + schema);
}
}执行MapReduce任务
在DataWorks上执行如下命令:
set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
-classpath feature_generator-${version}.jar
com.aliyun.pai.Main
-i ${input_table}/dt=${bdp.system.bizdate}
-o ${output_table}/dt=${bdp.system.bizdate}
-f fg.json -s false;参数 | 默认值 | 说明 |
-i | 无 | 输入表(分区)。 |
-o | 无 | 输出表(分区),需要预先创建好。 |
-f | 无 | fg配置文件,JSON格式。 |
-s | true | fg的结果是否拼接为一个大string,存储到features字段。 |
-b | false | 是否需要做分箱操作。 |
创建输出MaxCompute Table
在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。
# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys
output_table = args['output_table']
force = args['force']
if eval(force):
print("delete output table")
o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
sys.exit(0)
fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
features = json.load(fp)
reserve_columns = set(features["reserves"])
input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema
need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
need_bucketize = eval(need_bucketize)
def is_sparse(feature):
if 'boundaries' in feature and feature['boundaries']:
return True
if 'vocab_list' in feature and feature['vocab_list']:
return True
if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
return True
if 'num_buckets' in feature and feature['num_buckets'] > 0:
return True
return False
columns = []
for column in schema.columns:
if column.name in reserve_columns:
columns.append(column)
for feature in features["features"]:
name = feature.get("feature_name", None)
feature_type = feature.get("feature_type", None)
if feature_type in ("combo_feature",):
value_type = 'string'
columns.append(Column(name=name, type=value_type))
elif feature_type == 'raw_feature':
value_dim = feature.get("value_dim", 1)
if need_bucketize and is_sparse(feature):
value_type = 'bigint' if value_dim == 1 else 'string'
elif value_dim != 1:
value_type = 'string'
else:
value_type = feature.get('value_type', 'double')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
if need_bucketize and is_sparse(feature):
value_type = 'string'
else:
value_type = feature.get('value_type', 'string')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
value_type = 'bigint' if need_bucketize else 'double'
columns.append(Column(name=name, type=value_type))
elif "sequence_name" in feature:
seq_name = feature["sequence_name"]
for fea in feature["features"]:
name = fea["feature_name"]
value_type = 'string'
columns.append(Column(name=seq_name + '__' + name, type=value_type))
elif feature_type is None:
raise KeyError("feature_type can not be none")
output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle) # 只有不存在表时才创建
执行之前需要在调度配置里配置好参数:input_table、output_table、force、fg_json_file、need_bucketize。
该文章对您有帮助吗?