本文介绍Java SDK的使用;同时介绍如何在MaxCompute平台上,通过MapReduce Job离线批量生成特征,可以根据配置文件和命令行参数决定是否对生成的特征做分箱操作。
使用限制
使用Java SDK
以Maven项目为例,介绍如何使用Java SDK。
1. 下载jar包到本地位置 /path/to/feature_generator-${version}.jar。
2. 使用mvn install:install-file安装到本地仓库
mvn install:install-file \
-Dfile=/path/to/feature_generator-${version}.jar \
-DgroupId=com.aliyun.pai \
-DartifactId=feature_generator \
-Dversion=${version} \
-Dpackaging=jar注意:需要替换上述命令中的${version}为实际的版本号
3. 在你的pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.43.4-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.pai</groupId>
<artifactId>feature_generator</artifactId>
<version>${version}</version>
</dependency>
</dependencies>4. 创建fg.json文件
5. 参考如下代码使用Java API
package com.aliyun.pai.fg;
import com.alibaba.fastjson2.JSON;
import com.aliyun.pai.FeatureConfig;
import com.aliyun.pai.FgHandler;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class FgHandlerTest {
@Test
public void testFgHandler() {
InputStream in = getClass().getClassLoader().getResourceAsStream("fg.json");
if (in == null) {
System.out.println("null input stream");
}
FeatureConfig fgConfig = JSON.parseObject(in, FeatureConfig.class);
// System.out.println("config: " + JSON.toJSONString(fgConfig));
FgHandler model = new FgHandler(fgConfig);
assert fgConfig != null;
Map<String, Object> record = new HashMap<>();
record.put("request_id", "a6b67f9d-024d-43fe-bc5c-29d1e8e7e72a");
record.put("user_id", "218687106163");
record.put("is_click", 1L);
record.put("is_pay", 0L);
record.put("is_new_user", 1L);
record.put("day_h", 6L);
record.put("query_token", "k2:0.5\u001Dk3:0.5");
record.put("title", "k1\u001Dk2\u001Dk3");
record.put("goods_id", "1142068348");
record.put("filter_type", null);
record.put("city", "hz");
record.put("province", "zj");
record.put("country", "china");
record.put("focus_author", "2255010511022,14164467");
System.out.println("record query_token:" + record.get("query_token"));
System.out.println("record title:" + record.get("title"));
Map<String, Object> result = model.Process(record);
System.out.println("result:" + JSON.toJSONString(result));
System.out.println("goods_id:" + result.get("goods_id"));
System.out.println("is_new_user:" + result.get("is_new_user"));
System.out.println("day_h:" + result.get("day_h"));
System.out.println("week_day:" + result.get("week_day"));
System.out.println("province:" + result.get("province"));
System.out.println("city:" + result.get("city"));
System.out.println("filter_type:" + result.get("filter_type"));
System.out.println("query_word:" + result.get("query_word"));
System.out.println("query_match:" + result.get("query_match"));
System.out.println("title:" + result.get("title"));
System.out.println("focus_author:" + result.get("focus_author"));
assertEquals("1142068348", result.get("goods_id"));
assertEquals(1L, result.get("is_new_user"));
assertEquals(6L, result.get("day_h"));
assertEquals(0L, result.get("week_day"));
assertEquals("zj", result.get("province"));
assertEquals("hz", result.get("city"));
assertEquals("-1024", result.get("filter_type"));
assertEquals("", result.get("query_word"));
assertEquals(1.0, result.get("query_match"));
assertEquals("k1\u0003k2\u0003k3", result.get("title"));
assertEquals("2255010511022\u000314164467", result.get("focus_author"));
}
}执行MapReduce任务
在DataWorks上执行如下命令:
set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
-classpath feature_generator-${version}.jar
com.aliyun.pai.Main
-i ${input_table}/dt=${bdp.system.bizdate}
-o ${output_table}/dt=${bdp.system.bizdate}
-f fg.json -s false;参数 | 默认值 | 说明 |
-i | 无 | 输入表(分区)。 |
-o | 无 | 输出表(分区),需要预先创建好。 |
-f | 无 | fg配置文件,JSON格式。 |
-s | true | fg的结果是否拼接为一个大string,存储到features字段。 |
-b | false | 是否需要做分箱操作。 |
创建输出MaxCompute Table
在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。
# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys
output_table = args['output_table']
force = args['force']
if eval(force):
print("delete output table")
o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
sys.exit(0)
fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
features = json.load(fp)
reserve_columns = set(features["reserves"])
input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema
need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
need_bucketize = eval(need_bucketize)
def is_sparse(feature):
if 'boundaries' in feature and feature['boundaries']:
return True
if 'vocab_list' in feature and feature['vocab_list']:
return True
if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
return True
if 'num_buckets' in feature and feature['num_buckets'] > 0:
return True
return False
columns = []
for column in schema.columns:
if column.name in reserve_columns:
columns.append(column)
for feature in features["features"]:
name = feature.get("feature_name", None)
feature_type = feature.get("feature_type", None)
if feature_type in ("combo_feature",):
value_type = 'string'
columns.append(Column(name=name, type=value_type))
elif feature_type == 'raw_feature':
value_dim = feature.get("value_dim", 1)
if need_bucketize and is_sparse(feature):
value_type = 'bigint' if value_dim == 1 else 'string'
elif value_dim != 1:
value_type = 'string'
else:
value_type = feature.get('value_type', 'double')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
if need_bucketize and is_sparse(feature):
value_type = 'string'
else:
value_type = feature.get('value_type', 'string')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
value_type = 'bigint' if need_bucketize else 'double'
columns.append(Column(name=name, type=value_type))
elif "sequence_name" in feature:
seq_name = feature["sequence_name"]
for fea in feature["features"]:
name = fea["feature_name"]
value_type = 'string'
columns.append(Column(name=seq_name + '__' + name, type=value_type))
elif feature_type is None:
raise KeyError("feature_type can not be none")
output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle) # 只有不存在表时才创建
执行之前需要在调度配置里配置好参数:input_table、output_table、force、fg_json_file、need_bucketize。
该文章对您有帮助吗?