本文为您介绍如何使用TensorFlow实现分布式DeepFM算法。
背景信息
DeepFM算法对应Wide&Deep部分,且将LR替换为FM,从而避免人工特征工程。
训练数据源为pai_online_project.dwd_avazu_ctr_deepmodel_train,测试数据源为pai_online_project.dwd_avazu_ctr_deepmodel_test,都是公开数据源,您可以直接使用。
操作步骤
- 下载模型文件。
- 修改模型配置代码。
- 修改特征参数,每个特征需要配置embedding_dim、hash_buckets及default_value。
self.fields_config_dict['hour'] = {'field_name': 'field1', 'embedding_dim': self.embedding_dim, 'hash_bucket': 50, 'default_value': '0'}
self.fields_config_dict['c1'] = {'field_name': 'field2', 'embedding_dim': self.embedding_dim, 'hash_bucket': 10, 'default_value': '0'}
DeepFM模型中,需要将所有特征的
embedding_dim配置为相同值,而
Wide&Deep模型无此限制。对于
user_id和
itemid,建议将
hash_buckets配置为较大值,而其他取值较少的特征建议将
hash_buckets配置为较小值。
- 配置模型,推荐使用DeepFM(deepfm)和Wide&Deep(wdl)。
tf.app.flags.DEFINE_string("model", 'deepfm', "model {'wdl', 'deepfm'}")
- 配置分布式参数。
tf.app.flags.DEFINE_string("job_name", "", "job name")
tf.app.flags.DEFINE_integer("task_index", None, "Worker or server index")
tf.app.flags.DEFINE_string("ps_hosts", "", "ps hosts")
tf.app.flags.DEFINE_string("worker_hosts", "", "worker hosts")
提交训练任务时,只需要配置
cluster(详情请参见下述提交训练任务步骤),系统自动生成分布式参数。
- 配置输入数据。
def _parse_batch_for_tabledataset(self, *args):
label = tf.reshape(args[0], [-1])
fields = [tf.reshape(v, [-1]) for v in args[1:]]
return dict(zip(self.feas_name, fields)), label
def train_input_fn_from_odps(self, data_path, epoch=10, batch_size=1024, slice_id=0, slice_count=1):
with tf.device('/cpu:0'):
dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
slice_count=slice_count, slice_id=slice_id)
dataset = dataset.batch(batch_size).repeat(epoch)
dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
return dataset
def val_input_fn_from_odps(self, data_path, epoch=1, batch_size=1024, slice_id=0, slice_count=1):
with tf.device('/cpu:0'):
dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
slice_count=slice_count, slice_id=slice_id)
dataset = dataset.batch(batch_size).repeat(epoch)
dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
return dataset
如果需要进行特征变换,建议在模型外通过MaxCompute实现,从而节约训练开销。如果在模型中进行特征变换,建议在
_parse_batch_for_tabledataset函数中实现。
- 上传已修改的模型文件至OSS。
- 可选:提交训练任务。
说明 如果没有训练完成的模型文件,则必须执行该步骤。反之,可以跳过该步骤,直接提交离线推理任务。
根据配置的模型类型,选择提交命令:
- DeepFM
pai -name tensorflow1120_ext
-project algo_public
-Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
-Darn=''
-Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
-Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
-DuserDefinedParameters="--task_type='train' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
-Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。
- Wide&Deep
pai -name tensorflow1120_ext
-project algo_public
-Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
-Darn=''
-Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
-Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
-DuserDefinedParameters="--task_type='train' --model='wdl' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
-Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。
因为该版本为PS分布式,所以需要配置cluster。示例中的cluster参数表示申请两个PS节点和8个Worker节点。同时,每个PS节点拥有12个CPU和10 GB内存,每个Worker节点拥有12个CPU及30 GB内存。各参数的具体介绍请参见PAI-TF任务参数介绍。
- 提交离线推理任务。
pai -name tensorflow1120_ext
-project algo_public
-Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
-Darn=''
-Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
-Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
-DuserDefinedParameters="--task_type='predict' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
-Dcluster='{\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}'
-Doutputs='odps://project_name/tables/output_table_name';
需要根据实际情况修改
project_name、
bucket_name、账号
arn(请参见
角色ARN)及地域。
离线推理需要一张已创建的
outputs表,每次执行推理任务的结果会覆盖该表。创建表的示例命令如下。
drop table project_name.output_table_name;
create table project_name.output_table_name
(
probabilities STRING
,logits STRING
)STORED AS ALIORC;
- 导出训练完成的模型文件。
pai -name tensorflow1120_ext
-project algo_public
-Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
-Darn=''
-Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
-Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
-DuserDefinedParameters="--task_type='savemodel' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
-Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改
project_name、
bucket_name、账号
arn(请参见
角色ARN)及地域。系统实际使用单Worker执行导出模型任务。