Java SDK 和 MapReduce API

本文介绍Java SDK的使用;同时介绍如何在MaxCompute平台上,通过MapReduce Job离线批量生成特征,可以根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

使用限制

  • MapReduce不支持ODPS2.0的复杂类型(list、map、float、int等)。

  • MapReduce版本的FG已停止维护,请使用PyFG代替。

  • Java SDK 仅提供部分功能,不支持特征依赖(DAG模式),完整功能请使用PyFG代替

使用Java SDK

Maven项目为例,介绍如何使用Java SDK

1. 下载jar到本地位置 /path/to/feature_generator-${version}.jar

2. 使用mvn install:install-file安装到本地仓库

mvn install:install-file \
  -Dfile=/path/to/feature_generator-${version}.jar \
  -DgroupId=com.aliyun.pai \
  -DartifactId=feature_generator \
  -Dversion=${version} \
  -Dpackaging=jar

注意:需要替换上述命令中的${version}为实际的版本号

3. 在你的pom.xml中添加依赖

<dependencies>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.43.4-public</version>
</dependency>
<dependency>
    <groupId>com.aliyun.pai</groupId>
    <artifactId>feature_generator</artifactId>
    <version>${version}</version>
</dependency>
</dependencies>

4. 创建fg.json文件

5. 参考如下代码使用Java API

package com.aliyun.pai.fg;
import com.alibaba.fastjson2.JSON;
import com.aliyun.pai.FeatureConfig;
import com.aliyun.pai.FgHandler;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class FgHandlerTest {
   @Test
    public void testFgHandler() {
        InputStream in = getClass().getClassLoader().getResourceAsStream("fg.json");
        if (in == null) {
            System.out.println("null input stream");
        }
        FeatureConfig fgConfig = JSON.parseObject(in, FeatureConfig.class);
        // System.out.println("config: " + JSON.toJSONString(fgConfig));
        FgHandler model = new FgHandler(fgConfig);
        assert fgConfig != null;
        Map<String, Object> record = new HashMap<>();
        record.put("request_id", "a6b67f9d-024d-43fe-bc5c-29d1e8e7e72a");
        record.put("user_id", "218687106163");
        record.put("is_click", 1L);
        record.put("is_pay", 0L);
        record.put("is_new_user", 1L);
        record.put("day_h", 6L);
        record.put("query_token", "k2:0.5\u001Dk3:0.5");
        record.put("title", "k1\u001Dk2\u001Dk3");
        
        record.put("goods_id", "1142068348");
        record.put("filter_type", null);
        record.put("city", "hz");
        record.put("province", "zj");
        record.put("country", "china");
        record.put("focus_author", "2255010511022,14164467");
        
        System.out.println("record query_token:" + record.get("query_token"));
        System.out.println("record title:" + record.get("title"));
        Map<String, Object> result = model.Process(record);
        System.out.println("result:" + JSON.toJSONString(result));
        System.out.println("goods_id:" + result.get("goods_id"));
        System.out.println("is_new_user:" + result.get("is_new_user"));
        System.out.println("day_h:" + result.get("day_h"));
        System.out.println("week_day:" + result.get("week_day"));
        System.out.println("province:" + result.get("province"));
        System.out.println("city:" + result.get("city"));
        System.out.println("filter_type:" + result.get("filter_type"));
        System.out.println("query_word:" + result.get("query_word"));
        System.out.println("query_match:" + result.get("query_match"));
        System.out.println("title:" + result.get("title"));
        System.out.println("focus_author:" + result.get("focus_author"));
                                                
        assertEquals("1142068348", result.get("goods_id"));
        assertEquals(1L, result.get("is_new_user"));
        assertEquals(6L, result.get("day_h"));
        assertEquals(0L, result.get("week_day"));
        assertEquals("zj", result.get("province"));
        assertEquals("hz", result.get("city"));
        assertEquals("-1024", result.get("filter_type"));
        assertEquals("", result.get("query_word"));
        assertEquals(1.0, result.get("query_match"));
        assertEquals("k1\u0003k2\u0003k3", result.get("title"));
        assertEquals("2255010511022\u000314164467", result.get("focus_author"));
    }
}

执行MapReduce任务

DataWorks上执行如下命令:

set odps.sql.planner.mode=sql;
set odps.isolation.session.enable=true;
set odps.sql.counters.dynamic.limit=true;
--@resource_reference{"feature_generator-${version}.jar"}
jar -resources feature_generator-${version}.jar,fg.json
    -classpath feature_generator-${version}.jar
    com.aliyun.pai.Main
    -i ${input_table}/dt=${bdp.system.bizdate}
    -o ${output_table}/dt=${bdp.system.bizdate}
    -f fg.json -s false;

参数

默认值

说明

-i

输入表(分区)。

-o

输出表(分区),需要预先创建好。

-f

fg配置文件,JSON格式。

-s

true

fg的结果是否拼接为一个大string,存储到features字段。

-b

false

是否需要做分箱操作。

创建输出MaxCompute Table

在执行上述命令之前,需要预先创建好输出表,可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本,根据fg.json的内容自动创建输出表。

# coding: utf-8
# 下面的代码请复制到 DataWorks 的 PyOdps3 节点后执行,执行之前需要设置好参数:input_table、output_table、force、fg_json_file、need_bucketize
'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
from odps.models import TableSchema, Column, Partition
import json
import sys

output_table = args['output_table']
force = args['force']
if eval(force):
    print("delete output table")
    o.delete_table(output_table, if_exists=True)
elif o.exist_table(output_table):
    sys.exit(0)

fg_json_file = args['fg_json_file']
with o.open_resource(fg_json_file, mode='r') as fp:
    features = json.load(fp)
reserve_columns = set(features["reserves"])

input_table = args['input_table']
t = o.get_table(input_table)
schema = t.table_schema

need_bucketize = args['need_bucketize']
if type(need_bucketize) == str:
    need_bucketize = eval(need_bucketize)

def is_sparse(feature):
    if 'boundaries' in feature and feature['boundaries']:
        return True
    if 'vocab_list' in feature and feature['vocab_list']:
        return True
    if 'hash_bucket_size' in feature and feature['hash_bucket_size'] > 0:
        return True
    if 'num_buckets' in feature and feature['num_buckets'] > 0:
        return True
    return False 

columns = []
for column in schema.columns:
    if column.name in reserve_columns:
        columns.append(column)

for feature in features["features"]:
    name = feature.get("feature_name", None)
    feature_type = feature.get("feature_type", None)
    if feature_type in ("combo_feature",):
        value_type = 'string'
        columns.append(Column(name=name, type=value_type))
    elif feature_type == 'raw_feature':
        value_dim = feature.get("value_dim", 1)
        if need_bucketize and is_sparse(feature):
            value_type = 'bigint' if value_dim == 1 else 'string'
        elif value_dim != 1:
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'double')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("id_feature", "lookup_feature", "match_feature", "text_normalizer"):
        if need_bucketize and is_sparse(feature):
            value_type = 'string'
        else:
            value_type = feature.get('value_type', 'string')
            if value_type in ('int32', 'int64'):
                value_type = 'bigint'
            elif value_type in ('float', 'double'):
                value_type = 'double'
        columns.append(Column(name=name, type=value_type))
    elif feature_type in ("expr_feature", "overlap_feature", "bm25_feature"):
        value_type = 'bigint' if need_bucketize else 'double'
        columns.append(Column(name=name, type=value_type))
    elif "sequence_name" in feature:
        seq_name = feature["sequence_name"]
        for fea in feature["features"]:
            name = fea["feature_name"]
            value_type = 'string'
            columns.append(Column(name=seq_name + '__' + name, type=value_type))
    elif feature_type is None:
        raise KeyError("feature_type can not be none")


output_schema = TableSchema(columns=columns, partitions=schema.partitions)
o.create_table(output_table, output_schema, if_not_exists=True, lifecycle=t.lifecycle)  # 只有不存在表时才创建

执行之前需要在调度配置里配置好参数:input_tableoutput_tableforcefg_json_fileneed_bucketize