在MaxCompute任务中使用FG

MaxCompute PyFg Job可以离线批量生成复杂特征,支持ODPS2.0的复杂类型(list、map、float、int等),根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

方式一:使用通用资源组镜像

DataWorks调度配置-资源属性区块选择一个通用的资源组,选择最新的dataworks_pairec_task_pod镜像。

方式二:安装依赖包(适用于老版DataWorks

登录DataWorks控制台,创建独享调度资源组,使用运维助手安装pyfg的包。

DataWorks独享资源组中安装pyfg包,页面路径为:DataWorks->管理中心->资源组列表->运维助手,示例如下:

/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg073-0.7.3-cp37-cp37m-linux_x86_64.whl

方式三:自定义资源组镜像(适用于新版DataWorks

参考文档:自定义镜像

上传资源文件

上传FG的配置文件(JSON格式)到MaxCompute的项目空间中。

某些FG算子需要额外的资源文件,用户需要手动上传到MaxCompute的项目空间中。

特征算子

描述

资源文件配置项

text_normalizer

文本归一化

停用词文件stop_char_file

tokenize_feature

文本分词特征

词汇表配置文件vocab_file

bm25_feature

文本相关性特征

词频配置文件 term_doc_freq_file

custom_feature

自定义算子

算子配置文件operator_lib_file

创建输出表

可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本。程序会根据fg.json的内容创建输出表,同时创建后续执行任务时需要使用的各种资源。

from pyfg073 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    force_delete_output_table=True,
    force_update_resource=True)
fg_task.create_output_table(o)

执行之前需要在调度配置里配置好参数:input_tableoutput_tablefg_json_filepartition_value

虽然fg_task.run(o)本身也会在没有输出表时自动创建输出表,但还是建议先调用该接口预先创建好输出表,防止在并发补数据时引起冲突导致任务失败。

执行 FG 离线任务

DataWorks里创建一个PyOdps3节点,执行下面的脚本,程序会根据fg.json的内容自动创建输出表。

from pyfg073 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    batch_size=128,
    force_delete_output_table=False,
    force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)

执行之前需要在调度配置里配置好参数:input_tableoutput_tablefg_json_filepartition_value

如果本地安装了PyODPS,pyfg也可以安装在本地,在本地提交任务。

参数说明

参数

默认值

说明

input_table

输入表。

output_table

输出表,会自动创建。

fg_json_file

FG配置文件,JSON格式。

partition_value

指定输入表的partition分区作为FG的输入,结果保存在输出表的partition分区中。

schema

指定[MaxComputeSchema](Schema操作)

batch_size

128

批量处理的记录数。

memory

1024

任务节点使用的内存大小,单位:M

force_delete_output_table

False

是否删除输出表,设置为True时会先自动删除输出表,再运行任务。

force_update_resource

False

是否更新资源,设置为True时会先自动更新资源,再运行任务;不要一直设置为True,会有并发冲突的问题。

output_merged_str

False

是否合并字符串,设置为 True 时会自动合并字符串,输出RTP格式的大String特征

debug

False

是否是调试模型,设置为True时会打印所有更新的资源的内容。

sql_setting

fg_task.add_sql_setting方法的key、value参数。指定MaxCompute SQL参数,详细见Flag参数列表,可以add多个flag。

fg_setting

fg_task.add_fg_setting方法的key、value参数。指定 FG 参数,详细见[全局配置](summary.md#id9),可以 add 多个配置项;从 v0.4.0 开始生效

直接按照示例修改代码中的参数默认值即可。

补充说明

pyfg的包是装在专有资源组的一台机器上(gateway);这台机器可以提交SQL任务到MaxCompute。pyfg的包也可以装在自己的任何一台机器上,这台机器通过pyodps这个工具就可以提交任务到MaxCompute集群。

SQL任务里的自定义UDF需要一些资源,包括FG的共享库,各种配置文件(如fg.json、词典、自定义OPlib等),以及UDF自己的代码文件(.py),这些资源都需要上传到MaxCompute集群,存储在MaxCompute的分布式文件系统中。执行任务时,每个worker都会从MaxCompute集群的分布式文件系统中下载这些资源,并加载到内存中。

有部分资源是多个任务共享的,比如FG的共享库,UDF的代码文件等。force_update_resource=True时,会先删除原来的资源,再上传新的资源。这里有个时间间隔,在这个时间间隔内,会影响到其他正在执行的任务。