Developer guide for custom sorting models

更新时间:
复制 MD 格式

The custom sorting model feature lets you deploy a TensorFlow-based deep neural network (DNN) ranking model on OpenSearch. This guide covers how to implement the CustomModel class, configure feature columns, and meet the compatibility requirements for online inference.

How it works

The framework builds a TensorFlow computation graph by calling the build function, which runs the following methods in order:

  1. build_placeholder()

  2. build_model()

  3. setup_global_step()

  4. reg_loss()

  5. loss_op()

  6. update_op()

  7. training_op()

  8. predictions_op()

  9. mark_output()

  10. metrics_op()

  11. summary_op()

  12. trace_sample_op()

You implement five of these methods. The rest use the framework's default logic — do not override them.

Prerequisites

Before you begin, ensure that you have:

  • A working TensorFlow 1.x environment

  • Familiarity with TensorFlow feature columns and DNN model training

  • Access to your training data and feature schema

Implement the CustomModel class

CustomModel must inherit from BaseModel. The high-level workflow is:

  1. Define the class and call the parent constructor.

  2. Implement the five required methods.

  3. Configure feature columns.

  4. Assign outputs to the required attributes (self.logits, self.loss, self.reg_loss).

Required methods

Implement exactly these five methods — the framework handles everything else:

MethodWhat to implement
build_model()Define the model architecture (embedding layer, DNN layers, logits layer)
update_op()Collect batch normalization update ops from tf.GraphKeys.UPDATE_OPS
reg_loss()Collect regularization losses from tf.GraphKeys.REGULARIZATION_LOSSES
loss_op()Define the training loss (e.g., sigmoid cross-entropy + reg loss)
training_op()Set up the optimizer and create self.train_op

Methods to leave unimplemented

Do not implement build_placeholder, mark_output, or trace_sample_op. These rely on framework logic that ensures online compatibility.

Sample implementation

The following example shows a complete CustomModel implementation using embedding features and a three-layer DNN:

from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope

from model_ops.tflog import tflogger as logging
import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel
from model_ops import ops as base_ops
from model_ops import utils


class CustomModel(BaseModel):
    def __init__(self, config, name="CTR"):
        # Always call the parent constructor with this exact signature
        super(CustomModel, self).__init__(config, name)

        # TensorFlow variable collections for tracking DNN layers
        self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
        self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)

        self.layer_dict = OrderedDict()

        # Configure embedding feature columns for sparse categorical features
        self.embedding_columns = ['feature1', 'feature2']
        for feature_name in self.embedding_columns:
            self.generate_embedding_feature_column(
                feature_name,
                hash_bucket_size=1000,
                dimension=16,
                initializer=tf.zeros_initializer,
                combiner="sum",
                is_share_embedding=False,
                shared_embedding_name=None
            )

        # Uncomment to add real-valued or sparse ID columns:
        # self.real_valued_columns = ['feature3', 'feature4']
        # for feature_name in self.real_valued_columns:
        #     self.generate_real_valued_feature_column(feature_name, dtype="Float", value_dimension=1)
        #
        # self.sparse_id_columns = ['feature5', 'feature6']
        # for feature_name in self.sparse_id_columns:
        #     self.generate_sparse_id_feature_column(feature_name, hash_bucket_size=1000, combiner="sum")

        # Model hyperparameters
        self.embedding_partition_size = 4 * 1024 * 1024
        self.dnn_partition_size = 64 * 1024
        self.dnn_l2_reg = 1e-6
        self.clip_gradients = 5.0
        self.dnn_hidden_units = [1024, 512, 256]

    def build_placeholder(self):
        try:
            self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
        except KeyError:
            self.is_training = tf.placeholder(tf.bool, name="training")

    def setup_global_step(self):
        global_step = tf.Variable(
            initial_value=0,
            name="global_step",
            trainable=False,
            dtype=tf.int64,
            collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
        self.global_step = global_step

    def embedding_layer(self):
        with tf.variable_scope(
            name_or_scope="Embedding_Layer",
            partitioner=base_ops.partitioner(self.config.ps_num, self.embedding_partition_size),
            reuse=tf.AUTO_REUSE
        ) as scope:
            logging.info('ps num: {}, embedding partition size: {} \n scope: {}'.format(
                self.config.ps_num, self.embedding_partition_size, scope))
            # Use input_from_feature_columns to embed sparse features — do not use other embedding functions
            self.layer_dict['dnn'] = layers.input_from_feature_columns(
                self.features,
                self.feature_columns_from_column_names(self.embedding_columns),
                scope=scope
            )

    def dnn_layer(self):
        dnn_layer = [self.layer_dict['dnn']]
        with tf.variable_scope(
            name_or_scope="{}_Score_Network".format(self.name),
            partitioner=base_ops.partitioner(self.config.ps_num, self.dnn_partition_size)
        ):
            self.dnn_net = tf.concat(values=dnn_layer, axis=1)
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
                    with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
                        tf.contrib.layers.apply_regularization(
                            regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
                            weights_list=[self.dnn_net])
                        self.dnn_net = layers.fully_connected(
                            self.dnn_net,
                            num_hidden_units,
                            utils.getActivationFunctionOp("llrelu"),
                            scope=dnn_hidden_layer_scope,
                            variables_collections=[self.collections_dnn_hidden_layer],
                            outputs_collections=[self.collections_dnn_hidden_output],
                            normalizer_fn=layers.batch_norm,
                            normalizer_params={"scale": True, "is_training": self.is_training})

    def logits_layer(self):
        with tf.variable_scope(
            name_or_scope="{}_Logits".format(self.name),
            partitioner=base_ops.partitioner(self.config.ps_num, self.dnn_partition_size)
        ) as dnn_logits_scope:
            with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
                # Assign raw logits to self.logits — OpenSearch applies sigmoid automatically
                self.logits = layers.linear(
                    self.dnn_net,
                    1,
                    scope=dnn_logits_scope,
                    variables_collections=[self.collections_dnn_hidden_layer],
                    outputs_collections=[self.collections_dnn_hidden_output])

    def build_model(self):
        self.embedding_layer()
        self.dnn_layer()
        self.logits_layer()

    def update_op(self):
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        self.update_ops = [op for op in update_ops if op.name.startswith(self.name)]

    def reg_loss(self):
        reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.reg_losses = [loss for loss in reg_losses if loss.name.startswith(self.name)]
        self.reg_loss = tf.reduce_sum(self.reg_losses)

    def training_op(self):
        if self.config.predict:
            self.train_op = None
            return
        with tf.variable_scope(name_or_scope="Optimize_Layer", reuse=tf.AUTO_REUSE):
            gs = tf.train.get_or_create_global_step()
            logging.info("Global_step: {}, {}".format(self.name, str(gs)))
            logging.info("Model_name: {}, train_op_final_loss: {}".format(self.name, str(self.loss)))

            self.train_op, _, _ = myopt.optimize_loss(
                loss=self.loss,
                global_step=self.global_step,
                learning_rate=0.01,
                optimizer=tf.train.AdamAsyncOptimizer(
                    learning_rate=0.01,
                    beta1=0.9,
                    beta2=0.999,
                    epsilon=1e-8,
                    use_locking=False),
                update_ops=self.update_ops,
                clip_gradients=self.clip_gradients,
                variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
                increment_global_step=True,
                summaries=myopt.OPTIMIZER_SUMMARIES)

    def loss_op(self):
        with tf.name_scope("{}_Loss_Op".format(self.name)):
            # Assign total loss (task loss + regularization) to self.loss
            self.loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    logits=self.logits,
                    labels=self.label))
            self.loss = self.loss + self.reg_loss

    def metrics_op(self):
        # Call the parent to let OpenSearch track common system metrics
        super(CustomModel, self).metrics_op()

    def summary_op(self):
        with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
            for key, metric in self.metrics.items():
                tf.summary.scalar(name=key, tensor=metric)

        with tf.name_scope("{}_Layer_Summary".format(self.name)):
            base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
            base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
            base_ops.add_weight_summary(self.collections_dnn_hidden_layer)

Configure feature columns

Access features through self.features using the feature name as the key. Configure all feature columns through the OpenSearch-provided API operations — using raw TensorFlow feature column functions directly risks incompatibility with the online inference service.

Important

Use contrib.layers.input_from_feature_columns to embed features. Other embedding functions are not supported and will cause issues in the online service.

Supported feature column types

TypeWhen to use
embedding_column (generate_embedding_feature_column)Sparse categorical features that need an embedding lookup
real_valued_column (generate_real_valued_feature_column)Continuous numeric features
sparse_column_with_hash_bucket (generate_sparse_id_feature_column)Sparse ID features without embedding
shared_embedding_columnsWhen the same embedding must be shared across multiple features
Important

An embedding column cannot be used twice in the same model. If two features need to share an embedding, configure them with is_share_embedding=True and the same shared_embedding_name.

API operations

Use the following encapsulated API operations to configure feature columns:

# Embedding column — for sparse categorical features
self.generate_embedding_feature_column(
    feature_name,
    hash_bucket_size,       # Number of hash buckets for the sparse feature
    dimension,              # Embedding dimension
    initializer=tf.zeros_initializer,
    combiner="sum",         # How to combine multi-valued features: "sum", "mean", or "sqrtn"
    is_share_embedding=False,
    shared_embedding_name=None
)

# Real-valued column — for continuous numeric features
self.generate_real_valued_feature_column(
    feature_name,
    dtype="Float",          # "Float" or "Int" only
    value_dimension=1
)

# Sparse ID column — for sparse features without embedding
self.generate_sparse_id_feature_column(
    feature_name,
    hash_bucket_size,
    combiner="sum"
)

# Retrieve configured feature columns by name
self.feature_columns_from_column_names(feature_list)

Model specifications

Meet all of the following requirements to ensure compatibility with the online service:

Attribute / methodRequirement
CustomModel.__init__Call super(CustomModel, self).__init__(config, name)
self.logitsAssign raw logits (pre-sigmoid scores). OpenSearch applies the sigmoid function automatically to convert them to the final ranking score. To use a different activation, override predictions_op.
self.lossAssign the total training loss
self.reg_lossAssign the regularization loss
metrics_opCall super(CustomModel, self).metrics_op() so OpenSearch can track system metrics

Usage notes

Custom variables outside the contrib module

If you create variables with tf.Variable or use functions outside the contrib module, add those variables to the MODEL_VARIABLES collection. OpenSearch loads weights based on this collection.

  • Add only variables whose weights need to be loaded at serving time.

  • Do not add global_step to this collection.

from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs

self._weights = vs.get_variable(
    _WEIGHTS_VARIABLE_NAME,
    [total_arg_size, output_size],
    dtype=dtype,
    initializer=kernel_initializer,
    collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES]
)