MapReduce API

MaxCompute MapReduce Job可以离线批量生成特征,根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

使用限制

  • MapReduce不支持ODPS2.0的复杂类型(list、map、float、int等)。

  • MapReduce版本的FG已停止维护,请使用PyFG代替。

Build MaxCompute MapReduce Jar

sudo yum install -y devtoolset-11 cmake3
bash package.sh

或者直接下载编译好的jar包

执行命令

在DataWorks上执行如下命令:

set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
    -classpath feature_generator-${version}.jar
    com.aliyun.pai.Main
    -i ${input_table}/dt=${bdp.system.bizdate}
    -o ${output_table}/dt=${bdp.system.bizdate}
    -f fg.json -s false;

参数

默认值

说明

-i

输入表(分区)。

-o

输出表(分区),需要预先创建好。

-f

fg配置文件,JSON格式。

-s

true

fg的结果是否拼接为一个大string,存储到features字段。

-b

false

是否需要做分箱操作。

创建输出MaxCompute Table

在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。

# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys

output_table = args['output_table']
force = args['force']
if eval(force):
    print("delete output table")
    o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
    sys.exit(0)

fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
    features = json.load(fp)
reserve_columns = set(features["reserves"])

input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema

need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
    need_bucketize = eval(need_bucketize)

def is_sparse(feature):
    if 'boundaries' in feature and feature['boundaries']:
        return True
    if 'vocab_list' in feature and feature['vocab_list']:
        return True
    if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
        return True
    if 'num_buckets' in feature and feature['num_buckets'] > 0:
        return True
    return False 

columns = []
for column in schema.columns:
    if column.name in reserve_columns:
        columns.append(column)

for feature in features["features"]:
    name = feature.get("feature_name", None)
    feature_type = feature.get("feature_type", None)
    if feature_type in ("combo_feature",):
        value_type = 'string'
        columns.append(Column(name=name, type=value_type))
    elif feature_type == 'raw_feature':
        value_dim = feature.get("value_dim", 1)
        if need_bucketize and is_sparse(feature):
            value_type = 'bigint' if value_dim == 1 else 'string'
        elif value_dim != 1:
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'double')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
        if need_bucketize and is_sparse(feature):
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'string')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
        value_type = 'bigint' if need_bucketize else 'double'
        columns.append(Column(name=name, type=value_type))
    elif "sequence_name" in feature:
        seq_name = feature["sequence_name"]
        for fea in feature["features"]:
            name = fea["feature_name"]
            value_type = 'string'
            columns.append(Column(name=seq_name + '__' + name, type=value_type))
    elif feature_type is None:
        raise KeyError("feature_type can not be none")


output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle)  # 只有不存在表时才创建

执行之前需要在调度配置里配置好参数:input_tableoutput_tableforcefg_json_fileneed_bucketize