Java SDK 和 MapReduce API

本文介绍Java SDK的使用;同时介绍如何在MaxCompute平台上,通过MapReduce Job离线批量生成特征,可以根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

使用限制

  • MapReduce不支持ODPS2.0的复杂类型(list、map、float、int等)。

  • MapReduce版本的FG已停止维护,请使用PyFG代替。

  • Java SDK 目前只支持在Linux x86_64平台上运行

使用Java SDK

Maven项目为例,介绍如何使用Java SDK

1. 下载jar到本地位置 /path/to/feature_generator-${version}-linux-x86_64.jar

2. 使用mvn install:install-file安装到本地仓库

mvn install:install-file \
  -Dfile=/path/to/feature_generator-${version}-linux-x86_64.jar \
  -DgroupId=com.aliyun.pai \
  -DartifactId=feature_generator \
  -Dversion=${version} \
  -Dclassifier=linux-x86_64 \
  -Dpackaging=jar

注意:需要替换上述命令中的${version}为实际的版本号

3. 在你的pom.xml中添加依赖

<dependencies>
<dependency>
    <groupId>com.aliyun.pai</groupId>
    <artifactId>feature_generator</artifactId>
    <version>${version}</version>
    <classifier>linux-x86_64</classifier>
</dependency>
</dependencies>

4. 创建fg.json文件

Demo示例:

{
  "features": [
    {
      "feature_name": "query_word",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "user:query_word",
      "default_value": "",
      "combiner": "mean",
      "need_prefix": false,
      "is_multi": true
    },
    {
      "feature_name": "query_match",
      "feature_type": "lookup_feature",
      "map": "user:query_token",
      "key": "item:title",
      "needDiscrete": false,
      "needKey": false,
      "default_value": "0",
      "combiner": "sum",
      "need_prefix": false,
      "value_type": "double"
    },
    {
      "feature_name": "goods_id",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "item:goods_id",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "filter_type",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "item:filter_type",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "day_h",
      "feature_type": "id_feature",
      "value_type": "int64",
      "expression": "user:day_h",
      "default_value": "0",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "week_day",
      "feature_type": "id_feature",
      "value_type": "int64",
      "expression": "user:week_day",
      "default_value": "0",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "city",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "user:city",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "province",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "user:province",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "country",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "user:country",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "is_new_user",
      "feature_type": "id_feature",
      "value_type": "int64",
      "expression": "user:is_new_user",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "value_dimension": 1
    },
    {
      "feature_name": "focus_author",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "user:focus_author",
      "separator": ",",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "is_multi": true
    },
    {
      "feature_name": "title",
      "feature_type": "id_feature",
      "value_type": "String",
      "expression": "item:title",
      "default_value": "-1024",
      "combiner": "mean",
      "need_prefix": false,
      "is_multi": true
    }
  ],
  "reserves": [
    "request_id",
    "user_id",
    "is_click",
    "is_pay"
  ]
}

5. 参考如下代码使用Java API

package org.example;

import com.aliyun.pai.fg.*;
import java.net.URL;
import java.util.*;

public class Main {
    public static void main(String[] args) {
        String filePath = "/path/to/fg.json";
        FgHandler handler = new FgHandler(filePath, 4, false);

        List<String> outputs = new ArrayList<>();
        outputs.add("goods_id");
        outputs.add("is_new_user");
        outputs.add("day_h");
        outputs.add("query_match"); // set output feature_name
        outputs.add("title");
        outputs.add("filter_type");
        outputs.add("city");
        outputs.add("province");
        outputs.add("country");
        outputs.add("focus_author");
        handler.setOutputs(outputs.toArray(new String[0]));

        List<String> expectGoods = Arrays.asList("218687106", "1142068348", "1142068347");
        VariantVectorMap inputs = new VariantVectorMap.Builder()
                .putOptionalString("goods_id", expectGoods)
                .putOptionalInt32("is_new_user", Arrays.asList(0, 1, 0))
                .putOptionalInt32("day_h", Arrays.asList(6, 8, 11))
                .putListString("title", Arrays.asList(
                        Arrays.asList("k2", "k3", "k5"),
                        Arrays.asList("k1", "k2", "k3"),
                        Arrays.asList("k2", "k4")))
                .putMapStringFloat("query_token", Arrays.asList(
                        new HashMap<String, Float>() {{
                            put("k2", 0.8f);
                            put("k3", 0.5f);
                        }},
                        new HashMap<String, Float>() {{
                            put("k1", 0.9f);
                            put("k4", 0.2f);
                        }},
                        new HashMap<String, Float>() {{
                            put("k1", 0.7f);
                            put("k2", 0.3f);
                            put("k4", 0.6f);
                        }}))
                .putOptionalString("filter_type", Arrays.asList(null, "f1", null))
                .putOptionalString("city", Arrays.asList("hangzhou"))
                .putOptionalString("province", Arrays.asList("zhejiang"))
                .putOptionalString("country", Arrays.asList("china"))
                .putOptionalString("focus_author", Arrays.asList("2255010511022,14164467", "10511022,24164467", "550105110,34164467"))
                .build();

        VariantVectorMap results = handler.Process(inputs);
        if (results == null || results.isNull()) {
            System.out.println("fg result is null");
            return;
        }
        System.out.println("result size=" + results.size());
        List<String> features = results.getKeys();
        System.out.println("result features=" + features);
        List<String> goodsIds = results.getList("goods_id");
        System.out.println("goods_ids=" + String.join(", ", goodsIds));

        List<List<String>> titles = results.getList("title");
        System.out.println("titles=" + titles);

        List<Long> dayHours = results.getList("day_h");
        System.out.println("day_h=" + dayHours);

        List<String> filters = results.getList("filter_type");
        System.out.println("filter_type=" + String.join(", ", filters));

        List<List<String>> focus = results.getList("focus_author");
        System.out.println("focus_author=" + focus);

        List<String> citys = results.getList("city");
        System.out.println("city=" + String.join(", ", citys));

        List<String> provinces = results.getList("province");
        System.out.println("provinces=" + String.join(", ", provinces));

        List<String> countrys = results.getList("country");
        System.out.println("country=" + String.join(", ", countrys));

        List<Long> isNewUsers = results.getList("is_new_user");
        System.out.println("is_new_user=" + isNewUsers);

        List<Double> queryMatch = results.getList("query_match");
        System.out.println("query_match=" + queryMatch);
        System.out.println("===========================================================");

        Set<String> itemInputs = handler.GetItemInputNames();
        System.out.println("item side inputs=" + itemInputs);

        Set<String> userInputs = handler.GetUserInputNames();
        System.out.println("user side inputs=" + userInputs);

        Set<String> ctxInputs = handler.GetContextInputNames();
        System.out.println("context side inputs=" + ctxInputs);

        Set<String> reserved = handler.GetReserveColumns();
        System.out.println("reserved columns =" + reserved);

        Set<String> qminputs = handler.GetFeatureInputs("query_match");
        System.out.println("inputs of query_match =" + qminputs);

        String defaultVal = handler.DefaultFeatureValue("query_match");
        System.out.println("default feature value of query_match =" + defaultVal);

        List<String> allFeatures = handler.GetAllFeatureNames();
        System.out.println("all feature names:" + allFeatures);

        Map<String, String> schema = handler.GetTableSchema();
        System.out.println("table schema:" + schema);
    }
}

执行MapReduce任务

DataWorks上执行如下命令:

set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
    -classpath feature_generator-${version}.jar
    com.aliyun.pai.Main
    -i ${input_table}/dt=${bdp.system.bizdate}
    -o ${output_table}/dt=${bdp.system.bizdate}
    -f fg.json -s false;

参数

默认值

说明

-i

输入表(分区)。

-o

输出表(分区),需要预先创建好。

-f

fg配置文件,JSON格式。

-s

true

fg的结果是否拼接为一个大string,存储到features字段。

-b

false

是否需要做分箱操作。

创建输出MaxCompute Table

在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。

# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys

output_table = args['output_table']
force = args['force']
if eval(force):
    print("delete output table")
    o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
    sys.exit(0)

fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
    features = json.load(fp)
reserve_columns = set(features["reserves"])

input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema

need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
    need_bucketize = eval(need_bucketize)

def is_sparse(feature):
    if 'boundaries' in feature and feature['boundaries']:
        return True
    if 'vocab_list' in feature and feature['vocab_list']:
        return True
    if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
        return True
    if 'num_buckets' in feature and feature['num_buckets'] > 0:
        return True
    return False 

columns = []
for column in schema.columns:
    if column.name in reserve_columns:
        columns.append(column)

for feature in features["features"]:
    name = feature.get("feature_name", None)
    feature_type = feature.get("feature_type", None)
    if feature_type in ("combo_feature",):
        value_type = 'string'
        columns.append(Column(name=name, type=value_type))
    elif feature_type == 'raw_feature':
        value_dim = feature.get("value_dim", 1)
        if need_bucketize and is_sparse(feature):
            value_type = 'bigint' if value_dim == 1 else 'string'
        elif value_dim != 1:
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'double')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
        if need_bucketize and is_sparse(feature):
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'string')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
        value_type = 'bigint' if need_bucketize else 'double'
        columns.append(Column(name=name, type=value_type))
    elif "sequence_name" in feature:
        seq_name = feature["sequence_name"]
        for fea in feature["features"]:
            name = fea["feature_name"]
            value_type = 'string'
            columns.append(Column(name=seq_name + '__' + name, type=value_type))
    elif feature_type is None:
        raise KeyError("feature_type can not be none")


output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle)  # 只有不存在表时才创建

执行之前需要在调度配置里配置好参数:input_tableoutput_tableforcefg_json_fileneed_bucketize