在离线任务中使用FG
MaxCompute PyFg Job可以离线批量生成复杂特征,支持ODPS2.0的复杂类型(list、map、float、int等),根据配置文件和命令行参数决定是否对生成的特征做分箱操作。
方式一:使用通用资源组镜像
在DataWorks的调度配置
-资源属性
区块选择一个通用的资源组,选择最新的dataworks_pairec_task_pod
镜像。
方式二:安装依赖包
登录DataWorks控制台,创建独享调度资源组,使用运维助手安装pyfg的包。
在DataWorks独享资源组中安装pyfg
包,页面路径为:DataWorks->管理中心->资源组列表->运维助手
,示例如下:
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg059-0.5.9-cp37-cp37m-linux_x86_64.whl
上传资源文件
上传FG的配置文件(JSON格式)到MaxCompute的项目空间中。
某些FG算子需要额外的资源文件,用户需要手动上传到MaxCompute的项目空间中。
特征算子 | 描述 | 资源文件配置项 |
文本归一化 | 停用词文件 | |
文本分词特征 | 词汇表配置文件 | |
文本相关性特征 | 词频配置文件 | |
自定义算子 | 算子配置文件 |
创建输出表
可以在DataWorks里创建一个PyOdps3节点,执行下面的脚本。程序会根据fg.json
的内容创建输出表,同时创建后续执行任务时需要使用的各种资源。
from pyfg059 import offline_pyfg
offline_pyfg.create_output_table(o,
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
0.3.9
版本之后也可以使用新的接口,如下:
from pyfg059 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
fg_task.create_output_table(o)
执行之前需要在调度配置里配置好参数:input_table
、output_table
、fg_json_file
、partition_value
。
执行 FG 离线任务
在DataWorks里创建一个PyOdps3节点,执行下面的脚本,程序会根据fg.json的内容自动创建输出表。
from pyfg059 import offline_pyfg
offline_pyfg.run(o,
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=False,
force_update_resource=False,
batch_size=128)
0.3.8版本之后也可以使用新的接口,如下:
from pyfg059 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128,
force_delete_output_table=False,
force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 64)
# fg_task.add_fg_setting('USE_MULTIPLICATIVE_HASH', 'true')
fg_task.run(o)
执行之前需要在调度配置里配置好参数:input_table
、output_table
、fg_json_file
、partition_value
。
如果本地安装了PyODPS,pyfg也可以安装在本地,在本地提交任务。
参数说明
参数 | 默认值 | 说明 |
input_table | 无 | 输入表。 |
output_table | 无 | 输出表,会自动创建。 |
fg_json_file | 无 | FG配置文件,JSON格式。 |
partition_value | 无 | 指定输入表的partition分区作为FG的输入,结果保存在输出表的partition分区中。 |
batch_size | 128 | 批量处理的记录数。 |
force_delete_output_table | False | 是否删除输出表,设置为True时会先自动删除输出表,再运行任务。 |
force_update_resource | False | 是否更新资源,设置为True时会先自动更新资源,再运行任务;不要一直设置为True,会有并发冲突的问题。 |
output_merged_str | False | 是否合并字符串,设置为 True 时会自动合并字符串,输出RTP格式的大String特征 |
debug | False | 是否是调试模型,设置为True时会打印所有更新的资源的内容。 |
set_sql | '无' |
|
sql_setting | 无 |
|
fg_setting | 无 |
|
直接按照示例修改代码中的参数默认值即可。
补充说明
pyfg的包是装在专有资源组的一台机器上(gateway);这台机器可以提交SQL任务到MaxCompute。pyfg的包也可以装在自己的任何一台机器上,这台机器通过pyodps这个工具就可以提交任务到MaxCompute集群。
SQL任务里的自定义UDF需要一些资源,包括FG的共享库,各种配置文件(如fg.json、词典、自定义OP的lib等),以及UDF自己的代码文件(.py),这些资源都需要上传到MaxCompute集群,存储在MaxCompute的分布式文件系统中。执行任务时,每个worker都会从MaxCompute集群的分布式文件系统中下载这些资源,并加载到内存中。
每次升级pyfg的包后,第一次执行任务时,需要设置参数force_update_resource=True
,触发一次资源update。但不要一直设置为True,会有并发冲突的问题。
有部分资源是多个任务共享的,比如FG的共享库,UDF的代码文件等。force_update_resource=True
时,会先删除原来的资源,再上传新的资源。这里有个时间间隔,在这个时间间隔内,会影响到其他正在执行的任务。