在离线任务中使用FG

更新时间: 2025-04-23 20:05:16

MaxCompute PyFg Job可以离线批量生成复杂特征,支持ODPS2.0的复杂类型(list、map、float、int等),根据配置文件和命令行参数决定是否对生成的特征做分箱操作。

方式一:使用通用资源组镜像

在DataWorks的调度配置-资源属性区块选择一个通用的资源组,选择最新的dataworks_pairec_task_pod镜像。

方式二:安装依赖包

登录DataWorks控制台,创建独享调度资源组,使用运维助手安装pyfg的包。

在DataWorks独享资源组中安装pyfg包,页面路径为:DataWorks->管理中心->资源组列表->运维助手,示例如下:

/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg059-0.5.9-cp37-cp37m-linux_x86_64.whl

上传资源文件

上传FG的配置文件(JSON格式)到MaxCompute的项目空间中。

某些FG算子需要额外的资源文件,用户需要手动上传到MaxCompute的项目空间中。

特征算子

描述

资源文件配置项

text_normalizer

文本归一化

停用词文件stop_char_file

tokenize_feature

文本分词特征

词汇表配置文件vocab_file

bm25_feature

文本相关性特征

词频配置文件 term_doc_freq_file

custom_feature

自定义算子

算子配置文件operator_lib_file

创建输出表

可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本。程序会根据fg.json的内容创建输出表,同时创建后续执行任务时需要使用的各种资源。

from pyfg059 import offline_pyfg

offline_pyfg.create_output_table(o,
    args['input_table'],
    args['output_table'],
    args['fg_json_file'],
    args['partition_value'],
    force_delete_output_table=True,
    force_update_resource=True)

0.3.9版本之后也可以使用新的接口,如下:

from pyfg059 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    force_delete_output_table=True,
    force_update_resource=True)
fg_task.create_output_table(o)

执行之前需要在调度配置里配置好参数:input_tableoutput_tablefg_json_filepartition_value

执行 FG 离线任务

在DataWorks里创建一个PyOdps3节点,执行下面的脚本,程序会根据fg.json的内容自动创建输出表。

from pyfg059 import offline_pyfg

offline_pyfg.run(o, 
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'], 
    force_delete_output_table=False, 
    force_update_resource=False,
    batch_size=128)

0.3.8版本之后也可以使用新的接口,如下:

from pyfg059 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'], 
    args['output_table'], 
    args['fg_json_file'], 
    args['partition_value'],
    batch_size=128,
    force_delete_output_table=False,
    force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 64)
# fg_task.add_fg_setting('USE_MULTIPLICATIVE_HASH', 'true')
fg_task.run(o)

执行之前需要在调度配置里配置好参数:input_tableoutput_tablefg_json_filepartition_value

如果本地安装了PyODPS,pyfg也可以安装在本地,在本地提交任务。

参数说明

参数

默认值

说明

input_table

输入表。

output_table

输出表,会自动创建。

fg_json_file

FG配置文件,JSON格式。

partition_value

指定输入表的partition分区作为FG的输入,结果保存在输出表的partition分区中。

batch_size

128

批量处理的记录数。

force_delete_output_table

False

是否删除输出表,设置为True时会先自动删除输出表,再运行任务。

force_update_resource

False

是否更新资源,设置为True时会先自动更新资源,再运行任务;不要一直设置为True,会有并发冲突的问题。

output_merged_str

False

是否合并字符串,设置为 True 时会自动合并字符串,输出RTP格式的大String特征

debug

False

是否是调试模型,设置为True时会打印所有更新的资源的内容。

set_sql

'无'

offline_pyfg.run接口的参数。指定MaxCompute SQL,详细见Flag参数列表,例如指定:SETodps.stage.mapper.split.size=64;可以分配更多资源,加快 SQL 运行速度。

sql_setting

fg_task.add_sql_setting方法的key、value参数。指定MaxCompute SQL参数,详细见Flag参数列表,可以add多个flag。

fg_setting

fg_task.add_fg_setting方法的key、value参数。指定 FG 参数,详细见[全局配置](summary.md#id9),可以 add 多个配置项;从 v0.4.0 开始生效

直接按照示例修改代码中的参数默认值即可。

补充说明

pyfg的包是装在专有资源组的一台机器上(gateway);这台机器可以提交SQL任务到MaxCompute。pyfg的包也可以装在自己的任何一台机器上,这台机器通过pyodps这个工具就可以提交任务到MaxCompute集群。

SQL任务里的自定义UDF需要一些资源,包括FG的共享库,各种配置文件(如fg.json、词典、自定义OP的lib等),以及UDF自己的代码文件(.py),这些资源都需要上传到MaxCompute集群,存储在MaxCompute的分布式文件系统中。执行任务时,每个worker都会从MaxCompute集群的分布式文件系统中下载这些资源,并加载到内存中。

每次升级pyfg的包后,第一次执行任务时,需要设置参数force_update_resource=True,触发一次资源update。但不要一直设置为True,会有并发冲突的问题。

有部分资源是多个任务共享的,比如FG的共享库,UDF的代码文件等。force_update_resource=True时,会先删除原来的资源,再上传新的资源。这里有个时间间隔,在这个时间间隔内,会影响到其他正在执行的任务。

上一篇: 自定义特征算子 下一篇: 内置特征算子
阿里云首页 智能推荐 AIRec 相关技术圈