定制排序模型开发指南

本篇文档将详细介绍自定义排序模型用到的JSON文件配置以及提供给用户自行实现的代码示例。

介绍

本篇文档将详细介绍自定义排序模型用到的JSON文件配置以及提供给用户自行实现的代码示例。

快速入门

用户需要实现CustomModel继承BaseModel

框架会通过build函数触发构图,具体调用步骤如下:

def build(self):

    self.build_placeholder()
    self.build_model()
    self.setup_global_step()
    self.reg_loss()
    self.loss_op()
    self.update_op()
    self.training_op()
    self.predictions_op()
    self.mark_output()
    self.metrics_op()
    self.summary_op()
    self.trace_sample_op()

用户需要实现以下几个方法:

def build_model(self):
    pass

def update_op(self):
    pass

def reg_loss(self):
    pass

def training_op(self):
    pass

def loss_op(self):
    pass

CustomModel代码参考:

from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging

import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel

from model_ops import ops as base_ops
from model_ops import utils


class CustomModel(BaseModel):
    def __init__(self,
                 config,
                 name="CTR"):
        super(CustomModel, self).__init__(config,name)

        # Define model variables collection
        self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
        self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)

        self.layer_dict = OrderedDict()

        self.embedding_columns = ['feature1','feature2']
        for feature_name in self.embedding_columns:
            self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)

        """ 
        
        self.real_valued_columns = ['feature3','feature4']
        for feature_name in self.real_valued_columns:
            self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)

        self.sparse_id_columns = ['feature5','feature6']
        for feature_name in self.sparse_id_columns:
            self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
        """

        self.embedding_partitino_size = 4 * 1024 * 1024
        self.dnn_partition_size = 64 * 1024
        self.dnn_l2_reg = 1e-6
        self.clip_gradients = 5.0
        self.dnn_hidden_units = [1024, 512, 256]


    def build_placeholder(self):
        try:
            self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
        except KeyError:
            self.is_training = tf.placeholder(tf.bool, name="training")

    def setup_global_step(self):
        global_step = tf.Variable(
            initial_value=0,
            name="global_step",
            trainable=False,
            dtype=tf.int64,
            collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
        self.global_step = global_step


    def embedding_layer(self):
        with tf.variable_scope(name_or_scope="Embedding_Layer",
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.embedding_partitino_size),
                               reuse=tf.AUTO_REUSE) as scope:
            logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
            self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
                                                                                self.feature_columns_from_column_names(
                                                                                    self.embedding_columns),
                                                                                scope=scope)


    def dnn_layer(self):
        dnn_layer = []
        dnn_layer.append(self.layer_dict['dnn'])
        with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)):
            self.dnn_net = tf.concat(values=dnn_layer, axis=1)
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
                    with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
                        tf.contrib.layers.apply_regularization(
                            regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
                            weights_list=[self.dnn_net])
                        self.dnn_net = layers.fully_connected(
                            self.dnn_net,
                            num_hidden_units,
                            utils.getActivationFunctionOp("llrelu"),
                            scope=dnn_hidden_layer_scope,
                            variables_collections=[self.collections_dnn_hidden_layer],
                            outputs_collections=[self.collections_dnn_hidden_output],
                            normalizer_fn=layers.batch_norm,
                            normalizer_params={"scale": True, "is_training": self.is_training})



    def logits_layer(self):
        with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
                               partitioner=base_ops.partitioner(self.config.ps_num,
                                                                self.dnn_partition_size)) as dnn_logits_scope:
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                self.logits = layers.linear(
                    self.dnn_net,
                    1,
                    scope=dnn_logits_scope,
                    variables_collections=[self.collections_dnn_hidden_layer],
                    outputs_collections=[self.collections_dnn_hidden_output])

    def build_model(self):
        self.embedding_layer()
        self.dnn_layer()
        self.logits_layer()

    def update_op(self):
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        self.update_ops = []
        for update_op in update_ops:
            if update_op.name.startswith(self.name):
                self.update_ops.append(update_op)

    def reg_loss(self):
        reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.reg_losses = []
        for reg_loss in reg_losses:
            if reg_loss.name.startswith(self.name):
                self.reg_losses.append(reg_loss)
        self.reg_loss = tf.reduce_sum(self.reg_losses)

    def training_op(self):
        if self.config.predict:
            self.train_op = None
            return
        with tf.variable_scope(name_or_scope="Optimize_Layer",
                               reuse=tf.AUTO_REUSE):
            gs = tf.train.get_or_create_global_step()
            logging.info("Global_step:{},{}".format(self.name, str(gs)))
            logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))

            self.train_op, _, _ = myopt.optimize_loss(
                loss=self.loss,
                global_step=self.global_step,
                learning_rate=0.01,
                optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
                                                      beta2=0.999, epsilon=1e-8,
                                                      use_locking=False),
                update_ops=self.update_ops,
                clip_gradients=self.clip_gradients,
                variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
                increment_global_step=True,
                summaries=myopt.OPTIMIZER_SUMMARIES)

    def loss_op(self):
        with tf.name_scope("{}_Loss_Op".format(self.name)):
            label = self.label
            self.loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    logits=self.logits,
                    labels=label))
            self.loss = self.loss + self.reg_loss



    def metrics_op(self):
        super(CustomModel, self).metrics_op()

    def summary_op(self):
        with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
            for key, metric in self.metrics.items():
                tf.summary.scalar(name=key, tensor=metric)

        with tf.name_scope("{}_Layer_Summary".format(self.name)):
            base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
            base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
            base_ops.add_weight_summary(self.collections_dnn_hidden_layer)

Feature

样本输入可以通过self.features获取,key为用户配置的特征名。

重要

请一律使用 contrib.layers.input_from_feature_columns,来实现 embedding 功能,不要用其他 embedding 函数,否则会出现无法线上预估问题。目前只支持 sparse_column_with_hash_bucketembedding_columnreal_valued_columnshared_embedding_columns 这几种 column。注意同一个embedding_column不能用两次,要用shared_embedding_columns。

开发建议:

为了避免线上模型兼容问题,我们封装了几个column接口,建议直接使用这几个接口:

#生成embedding_column
self.generate_embedding_feature_column(
    feature_name,
    hash_bucket_size,
    dimension,
    initializer=tf.zeros_initializer,
    combiner="sum",
    is_share_embedding=False,
    shared_embedding_name=None
)

#生成real_valued_column
self.generate_real_valued_feature_column(
    feature_name,
    dtype="Float", #仅支持Float和Int
    value_dimension=1
)

#生成sparse_column
self.generate_sparse_id_feature_column(
    feature_name,
    hash_bucket_size,
    combiner="sum"
)

#获取配置的feature column
self.feature_columns_from_column_names(
    feature_list
)

模型规范

为了适配线上Service,我们模型需要做以下规范:

初始化记得调用父类:super(CustomModel, self).__init__(config,name)。

logits:需要传给self.logits。我们最终会对logits做sigmoid操作作为最终预估分数。如果客户需要用其他方式算分,需要重写predictions_op方法。

loss:需要传给self.loss。

reg_loss:需要传给self.reg_loss。

metrics_op:要记得调用父类super(CustomModel, self).metrics_op(),我们会做一些系统通用指标监控。

以下方法不建议用户实现:

build_placeholde、rmark_output、trace_sample_op使用框架默认逻辑。

模型开发注意事项

variable 及权重

用户如果需要使用额外的 tf.Variable 或者用 contrib 之外的一些网络函数,注意把 variable 加到 MODEL_VARIABLES 这个 collection 里,我们会根据这个加载权重。请注意,需要在线加载权重的再加入,像global_step这种是不需要的!

示例:

from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs

self._weights = vs.get_variable(
          _WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
          dtype=dtype,
          initializer=kernel_initializer,
          collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])