文档

使用TensorFlow实现分布式DeepFM算法

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文为您介绍如何使用TensorFlow实现分布式DeepFM算法。

警告

公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务

前提条件

背景信息

DeepFM算法对应Wide&Deep部分,且将LR替换为FM,从而避免人工特征工程。

训练数据源为pai_online_project.dwd_avazu_ctr_deepmodel_train,测试数据源为pai_online_project.dwd_avazu_ctr_deepmodel_test,都是公开数据源,您可以直接使用。

操作步骤

  1. 下载模型文件

  2. 修改模型配置代码。

    1. 修改特征参数,每个特征需要配置embedding_dimhash_bucketsdefault_value

      self.fields_config_dict['hour'] = {'field_name': 'field1', 'embedding_dim': self.embedding_dim, 'hash_bucket': 50, 'default_value': '0'}
      self.fields_config_dict['c1'] = {'field_name': 'field2', 'embedding_dim': self.embedding_dim, 'hash_bucket': 10, 'default_value': '0'}

      DeepFM模型中,需要将所有特征的embedding_dim配置为相同值,而Wide&Deep模型无此限制。对于user_iditemid,建议将hash_buckets配置为较大值,而其他取值较少的特征建议将hash_buckets配置为较小值。

    2. 配置模型,推荐使用DeepFM(deepfm)和Wide&Deepwdl)。

      tf.app.flags.DEFINE_string("model", 'deepfm', "model {'wdl', 'deepfm'}")
    3. 配置分布式参数。

      tf.app.flags.DEFINE_string("job_name", "", "job name")
      tf.app.flags.DEFINE_integer("task_index", None, "Worker or server index")
      tf.app.flags.DEFINE_string("ps_hosts", "", "ps hosts")
      tf.app.flags.DEFINE_string("worker_hosts", "", "worker hosts")

      提交训练任务时,只需要配置cluster(详情请参见下述提交训练任务步骤),系统自动生成分布式参数。

    4. 配置输入数据。

        def _parse_batch_for_tabledataset(self, *args):
          label = tf.reshape(args[0], [-1])
          fields = [tf.reshape(v, [-1]) for v in args[1:]]
          return dict(zip(self.feas_name, fields)), label
      
        def train_input_fn_from_odps(self, data_path, epoch=10, batch_size=1024, slice_id=0, slice_count=1):
          with tf.device('/cpu:0'):
            dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
                               slice_count=slice_count, slice_id=slice_id)
            dataset = dataset.batch(batch_size).repeat(epoch)
            dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
            return dataset
      
        def val_input_fn_from_odps(self, data_path, epoch=1, batch_size=1024, slice_id=0, slice_count=1):
          with tf.device('/cpu:0'):
            dataset = tf.data.TableRecordDataset([data_path], record_defaults=self.record_defaults,
                               slice_count=slice_count, slice_id=slice_id)
            dataset = dataset.batch(batch_size).repeat(epoch)
            dataset = dataset.map(self._parse_batch_for_tabledataset, num_parallel_calls=8).prefetch(100)
            return dataset

      如果需要进行特征变换,建议在模型外通过MaxCompute实现,从而节约训练开销。如果在模型中进行特征变换,建议在_parse_batch_for_tabledataset函数中实现。

  3. 上传已修改的模型文件至OSS。

  4. 可选:提交训练任务。

    说明

    如果没有训练完成的模型文件,则必须执行该步骤。反之,可以跳过该步骤,直接提交离线推理任务。

    根据配置的模型类型,选择提交命令:

    • DeepFM

      pai -name tensorflow1120_ext
      -project algo_public
      -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
      -Darn=''
      -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
      -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
      -DuserDefinedParameters="--task_type='train' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
      -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

      需要根据实际情况修改project_namebucket_name、账号arn(请参见角色ARN)及地域。

    • Wide&Deep

      pai -name tensorflow1120_ext
      -project algo_public
      -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
      -Darn=''
      -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
      -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
      -DuserDefinedParameters="--task_type='train' --model='wdl' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
      -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

      需要根据实际情况修改project_namebucket_name、账号arn(请参见角色ARN)及地域。

    因为该版本为PS分布式,所以需要配置cluster。示例中的cluster参数表示申请两个PS节点和8个Worker节点。同时,每个PS节点拥有12个CPU和10 GB内存,每个Worker节点拥有12个CPU及30 GB内存。各参数的具体介绍请参见PAI-TF任务参数介绍

  5. 提交离线推理任务。

    pai -name tensorflow1120_ext
    -project algo_public
    -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
    -Darn=''
    -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
    -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
    -DuserDefinedParameters="--task_type='predict' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
    -Dcluster='{\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}'
    -Doutputs='odps://project_name/tables/output_table_name';

    需要根据实际情况修改project_namebucket_name、账号arn(请参见角色ARN)及地域。

    离线推理需要一张已创建的outputs表,每次执行推理任务的结果会覆盖该表。创建表的示例命令如下。

    drop table project_name.output_table_name;
    create table project_name.output_table_name
    (
       probabilities STRING
       ,logits STRING
    )STORED AS ALIORC;
  6. 导出训练完成的模型文件。

    pai -name tensorflow1120_ext
    -project algo_public
    -Dbuckets='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/'
    -Darn=''
    -Dscript='oss://bucket_name.oss-cn-region-internal.aliyuncs.com/demo/deepfm_pai_ctr.py'
    -Dtables='odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_train,odps://pai_online_project/tables/dwd_avazu_ctr_predict_deepmodel_test'
    -DuserDefinedParameters="--task_type='savemodel' --model='deepfm' --checkpoint_dir='oss://bucket_name/path/' --output_dir='oss://bucket_name/path/'"
    -Dcluster='{\"ps\":{\"count\":2,\"cpu\":1200,\"memory\":10000},\"worker\":{\"count\":8,\"cpu\":1200,\"memory\":30000}}';

    需要根据实际情况修改project_namebucket_name、账号arn(请参见角色ARN)及地域。系统实际使用单Worker执行导出模型任务。