在离线任务中使用FG

MaxCompute PyFg Job可以离线批量生成复杂特征,支持ODPS2.0的复杂类型(list、map、float、int等),根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

安装依赖包

登录DataWorks控制台,创建独享调度资源组,使用运维助手安装pyfg包,示例如下:

/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg038-0.3.8-cp37-cp37m-linux_x86_64.whl

上传资源文件

上传FG的配置文件(JSON格式)到MaxCompute的项目空间中。

某些FG算子需要额外的资源文件,用户需要手动上传到MaxCompute的项目空间中。

特征算子

描述

资源文件配置项

text_normalizer

文本归一化

停用词文件stop_char_file

tokenize_feature

文本分词特征

词汇表配置文件vocab_file

custom_feature

自定义算子

算子配置文件operator_lib_file

执行 FG 离线任务

在DataWorks里创建一个PyOdps3节点,执行下面的脚本,程序会根据fg.json的内容自动创建输出表。

from pyfg038 import offline_pyfg

offline_pyfg.run(o, 
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'], 
    force_delete_output_table=False, 
    force_update_resource=False,
    batch_size=128)

0.3.8版本之后也可以使用新的接口,如下:

from pyfg038 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    batch_size=128,
    force_delete_output_table=False,
    force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 64)
fg_task.run(o)

执行之前需要在调度配置里配置好参数:input_tableoutput_tablefg_json_filepartition_value

如果本地安装了PyODPS,pyfg也可以安装在本地,在本地提交任务。

参数说明

参数

默认值

说明

input_table

输入表。

output_table

输出表,会自动创建。

fg_json_file

FG配置文件,JSON格式。

partition_value

指定输入表的partition分区作为FG的输入,结果保存在输出表的partition分区中。

batch_size

128

批量处理的记录数。

force_delete_output_table

False

是否删除输出表,设置为True时会先自动删除输出表,再运行任务。

force_update_resource

False

是否更新资源,设置为True时会先自动更新资源,再运行任务。

debug

False

是否是调试模型,设置为True时会打印所有更新的资源的内容。

set_sql

'无'

offline_pyfg.run接口的参数。指定MaxCompute SQL,详细见Flag参数列表,例如指定:SETodps.stage.mapper.split.size=64;可以分配更多资源,加快 SQL 运行速度。

sql_setting

fg_task.add_sql_setting方法的key、value参数。指定MaxCompute SQL参数,详细见Flag参数列表,可以add多个flag。