本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文为您介绍如何使用TensorFlow实现分布式DeepFM算法。
公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务。
前提条件
开通OSS,并创建Bucket,详情请参见开通OSS服务和控制台创建存储空间。
重要创建Bucket时,不要开通版本控制,否则同名文件无法覆盖。
完成OSS访问授权,详情请参见云产品依赖与授权:Designer。
背景信息
DeepFM算法对应Wide&Deep部分,且将LR替换为FM,从而避免人工特征工程。
训练数据源为pai_online_project.dwd_avazu_ctr_deepmodel_train,测试数据源为pai_online_project.dwd_avazu_ctr_deepmodel_test,都是公开数据源,您可以直接使用。
操作步骤
下载模型文件。
修改模型配置代码。
修改特征参数,每个特征需要配置embedding_dim、hash_buckets及default_value。
self.fields_config_dict['hour'] = {'field_name': 'field1', 'embedding_dim': self.embedding_dim, 'hash_bucket': 50, 'default_value': '0'} self.fields_config_dict['c1'] = {'field_name': 'field2', 'embedding_dim': self.embedding_dim, 'hash_bucket': 10, 'default_value': '0'}
DeepFM模型中,需要将所有特征的embedding_dim配置为相同值,而Wide&Deep模型无此限制。对于user_id和itemid,建议将hash_buckets配置为较大值,而其他取值较少的特征建议将hash_buckets配置为较小值。
配置模型,推荐使用DeepFM(deepfm)和Wide&Deep(wdl)。
tf.app.flags.DEFINE_string("model", 'deepfm', "model {'wdl', 'deepfm'}")
配置分布式参数。
tf.app.flags.DEFINE_string("job_name", "", "job name") tf.app.flags.DEFINE_integer("task_index", None, "Worker or server index") tf.app.flags.DEFINE_string("ps_hosts", "", "ps hosts") tf.app.flags.DEFINE_string("worker_hosts", "", "worker hosts")
提交训练任务时,只需要配置cluster(详情请参见下述提交训练任务步骤),系统自动生成分布式参数。
配置输入数据。
def _parse_batch_for_tabledataset(self, *args): label = tf.reshape(args[0], [-1]) fields = [tf.reshape(v, [-1]) for v in args[1:]] return dict(zip(self.feas_name, fields)), label def train_input_fn_from_odps(self, data_path, epoch=10, batch_size=1024, slice_id=0, slice_count=1): with tf.device('/cpu:0'): dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults, slice_count=slice_count, slice_id=slice_id) dataset = dataset.batch(batch_size).repeat(epoch) dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100) return dataset def val_input_fn_from_odps(self, data_path, epoch=1, batch_size=1024, slice_id=0, slice_count=1): with tf.device('/cpu:0'): dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults, slice_count=slice_count, slice_id=slice_id) dataset = dataset.batch(batch_size).repeat(epoch) dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100) return dataset
如果需要进行特征变换,建议在模型外通过MaxCompute实现,从而节约训练开销。如果在模型中进行特征变换,建议在_parse_batch_for_tabledataset函数中实现。
上传已修改的模型文件至OSS。
可选:提交训练任务。
说明如果没有训练完成的模型文件,则必须执行该步骤。反之,可以跳过该步骤,直接提交离线推理任务。
根据配置的模型类型,选择提交命令:
DeepFM
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='train' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。
Wide&Deep
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='train' --model='wdl' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。
因为该版本为PS分布式,所以需要配置cluster。示例中的cluster参数表示申请两个PS节点和8个Worker节点。同时,每个PS节点拥有12个CPU和10 GB内存,每个Worker节点拥有12个CPU及30 GB内存。各参数的具体介绍请参见PAI-TF任务参数介绍。
提交离线推理任务。
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='predict' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}' -Doutputs='odps://project_name/tables/output_table_name';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。
离线推理需要一张已创建的outputs表,每次执行推理任务的结果会覆盖该表。创建表的示例命令如下。
drop table project_name.output_table_name; create table project_name.output_table_name ( probabilities STRING ,logits STRING )STORED AS ALIORC;
导出训练完成的模型文件。
pai -name tensorflow1120_ext -project algo_public -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/' -Darn='' -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py' -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test' -DuserDefinedParameters="--task_type='savemodel' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'" -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';
需要根据实际情况修改project_name、bucket_name、账号arn(请参见角色ARN)及地域。系统实际使用单Worker执行导出模型任务。