MaxCompute MapReduce Job可以离线批量生成特征,根据配置文件和命令行参数决定是否对生成的特征做分箱操作。
使用限制
MapReduce不支持ODPS2.0的复杂类型(list、map、float、int等)。
MapReduce版本的FG已停止维护,请使用PyFG代替。
Build MaxCompute MapReduce Jar
sudo yum install -y devtoolset-11 cmake3
bash package.sh
或者直接下载编译好的jar包。
执行命令
在DataWorks上执行如下命令:
set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
-classpath feature_generator-${version}.jar
com.aliyun.pai.Main
-i ${input_table}/dt=${bdp.system.bizdate}
-o ${output_table}/dt=${bdp.system.bizdate}
-f fg.json -s false;
参数 | 默认值 | 说明 |
-i | 无 | 输入表(分区)。 |
-o | 无 | 输出表(分区),需要预先创建好。 |
-f | 无 | fg配置文件,JSON格式。 |
-s | true | fg的结果是否拼接为一个大string,存储到features字段。 |
-b | false | 是否需要做分箱操作。 |
创建输出MaxCompute Table
在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。
# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys
output_table = args['output_table']
force = args['force']
if eval(force):
print("delete output table")
o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
sys.exit(0)
fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
features = json.load(fp)
reserve_columns = set(features["reserves"])
input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema
need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
need_bucketize = eval(need_bucketize)
def is_sparse(feature):
if 'boundaries' in feature and feature['boundaries']:
return True
if 'vocab_list' in feature and feature['vocab_list']:
return True
if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
return True
if 'num_buckets' in feature and feature['num_buckets'] > 0:
return True
return False
columns = []
for column in schema.columns:
if column.name in reserve_columns:
columns.append(column)
for feature in features["features"]:
name = feature.get("feature_name", None)
feature_type = feature.get("feature_type", None)
if feature_type in ("combo_feature",):
value_type = 'string'
columns.append(Column(name=name, type=value_type))
elif feature_type == 'raw_feature':
value_dim = feature.get("value_dim", 1)
if need_bucketize and is_sparse(feature):
value_type = 'bigint' if value_dim == 1 else 'string'
elif value_dim != 1:
value_type = 'string'
else:
value_type = feature.get('value_type', 'double')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
if need_bucketize and is_sparse(feature):
value_type = 'string'
else:
value_type = feature.get('value_type', 'string')
if value_type in ('int32', 'int64'):
value_type = 'bigint'
elif value_type in ('float', 'double'):
value_type = 'double'
columns.append(Column(name=name, type=value_type))
elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
value_type = 'bigint' if need_bucketize else 'double'
columns.append(Column(name=name, type=value_type))
elif "sequence_name" in feature:
seq_name = feature["sequence_name"]
for fea in feature["features"]:
name = fea["feature_name"]
value_type = 'string'
columns.append(Column(name=seq_name + '__' + name, type=value_type))
elif feature_type is None:
raise KeyError("feature_type can not be none")
output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle) # 只有不存在表时才创建
执行之前需要在调度配置里配置好参数:input_table
、output_table
、force
、fg_json_file
、need_bucketize
。
文档内容是否对您有帮助?