The custom sorting model feature lets you deploy a TensorFlow-based deep neural network (DNN) ranking model on OpenSearch. This guide covers how to implement the CustomModel class, configure feature columns, and meet the compatibility requirements for online inference.
How it works
The framework builds a TensorFlow computation graph by calling the build function, which runs the following methods in order:
build_placeholder()build_model()setup_global_step()reg_loss()loss_op()update_op()training_op()predictions_op()mark_output()metrics_op()summary_op()trace_sample_op()
You implement five of these methods. The rest use the framework's default logic — do not override them.
Prerequisites
Before you begin, ensure that you have:
A working TensorFlow 1.x environment
Familiarity with TensorFlow feature columns and DNN model training
Access to your training data and feature schema
Implement the CustomModel class
CustomModel must inherit from BaseModel. The high-level workflow is:
Define the class and call the parent constructor.
Implement the five required methods.
Configure feature columns.
Assign outputs to the required attributes (
self.logits,self.loss,self.reg_loss).
Required methods
Implement exactly these five methods — the framework handles everything else:
| Method | What to implement |
|---|---|
build_model() | Define the model architecture (embedding layer, DNN layers, logits layer) |
update_op() | Collect batch normalization update ops from tf.GraphKeys.UPDATE_OPS |
reg_loss() | Collect regularization losses from tf.GraphKeys.REGULARIZATION_LOSSES |
loss_op() | Define the training loss (e.g., sigmoid cross-entropy + reg loss) |
training_op() | Set up the optimizer and create self.train_op |
Methods to leave unimplemented
Do not implement build_placeholder, mark_output, or trace_sample_op. These rely on framework logic that ensures online compatibility.
Sample implementation
The following example shows a complete CustomModel implementation using embedding features and a three-layer DNN:
from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging
import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel
from model_ops import ops as base_ops
from model_ops import utils
class CustomModel(BaseModel):
def __init__(self, config, name="CTR"):
# Always call the parent constructor with this exact signature
super(CustomModel, self).__init__(config, name)
# TensorFlow variable collections for tracking DNN layers
self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)
self.layer_dict = OrderedDict()
# Configure embedding feature columns for sparse categorical features
self.embedding_columns = ['feature1', 'feature2']
for feature_name in self.embedding_columns:
self.generate_embedding_feature_column(
feature_name,
hash_bucket_size=1000,
dimension=16,
initializer=tf.zeros_initializer,
combiner="sum",
is_share_embedding=False,
shared_embedding_name=None
)
# Uncomment to add real-valued or sparse ID columns:
# self.real_valued_columns = ['feature3', 'feature4']
# for feature_name in self.real_valued_columns:
# self.generate_real_valued_feature_column(feature_name, dtype="Float", value_dimension=1)
#
# self.sparse_id_columns = ['feature5', 'feature6']
# for feature_name in self.sparse_id_columns:
# self.generate_sparse_id_feature_column(feature_name, hash_bucket_size=1000, combiner="sum")
# Model hyperparameters
self.embedding_partition_size = 4 * 1024 * 1024
self.dnn_partition_size = 64 * 1024
self.dnn_l2_reg = 1e-6
self.clip_gradients = 5.0
self.dnn_hidden_units = [1024, 512, 256]
def build_placeholder(self):
try:
self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
except KeyError:
self.is_training = tf.placeholder(tf.bool, name="training")
def setup_global_step(self):
global_step = tf.Variable(
initial_value=0,
name="global_step",
trainable=False,
dtype=tf.int64,
collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
self.global_step = global_step
def embedding_layer(self):
with tf.variable_scope(
name_or_scope="Embedding_Layer",
partitioner=base_ops.partitioner(self.config.ps_num, self.embedding_partition_size),
reuse=tf.AUTO_REUSE
) as scope:
logging.info('ps num: {}, embedding partition size: {} \n scope: {}'.format(
self.config.ps_num, self.embedding_partition_size, scope))
# Use input_from_feature_columns to embed sparse features — do not use other embedding functions
self.layer_dict['dnn'] = layers.input_from_feature_columns(
self.features,
self.feature_columns_from_column_names(self.embedding_columns),
scope=scope
)
def dnn_layer(self):
dnn_layer = [self.layer_dict['dnn']]
with tf.variable_scope(
name_or_scope="{}_Score_Network".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num, self.dnn_partition_size)
):
self.dnn_net = tf.concat(values=dnn_layer, axis=1)
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
tf.contrib.layers.apply_regularization(
regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
weights_list=[self.dnn_net])
self.dnn_net = layers.fully_connected(
self.dnn_net,
num_hidden_units,
utils.getActivationFunctionOp("llrelu"),
scope=dnn_hidden_layer_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output],
normalizer_fn=layers.batch_norm,
normalizer_params={"scale": True, "is_training": self.is_training})
def logits_layer(self):
with tf.variable_scope(
name_or_scope="{}_Logits".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num, self.dnn_partition_size)
) as dnn_logits_scope:
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
# Assign raw logits to self.logits — OpenSearch applies sigmoid automatically
self.logits = layers.linear(
self.dnn_net,
1,
scope=dnn_logits_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output])
def build_model(self):
self.embedding_layer()
self.dnn_layer()
self.logits_layer()
def update_op(self):
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
self.update_ops = [op for op in update_ops if op.name.startswith(self.name)]
def reg_loss(self):
reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
self.reg_losses = [loss for loss in reg_losses if loss.name.startswith(self.name)]
self.reg_loss = tf.reduce_sum(self.reg_losses)
def training_op(self):
if self.config.predict:
self.train_op = None
return
with tf.variable_scope(name_or_scope="Optimize_Layer", reuse=tf.AUTO_REUSE):
gs = tf.train.get_or_create_global_step()
logging.info("Global_step: {}, {}".format(self.name, str(gs)))
logging.info("Model_name: {}, train_op_final_loss: {}".format(self.name, str(self.loss)))
self.train_op, _, _ = myopt.optimize_loss(
loss=self.loss,
global_step=self.global_step,
learning_rate=0.01,
optimizer=tf.train.AdamAsyncOptimizer(
learning_rate=0.01,
beta1=0.9,
beta2=0.999,
epsilon=1e-8,
use_locking=False),
update_ops=self.update_ops,
clip_gradients=self.clip_gradients,
variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
increment_global_step=True,
summaries=myopt.OPTIMIZER_SUMMARIES)
def loss_op(self):
with tf.name_scope("{}_Loss_Op".format(self.name)):
# Assign total loss (task loss + regularization) to self.loss
self.loss = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(
logits=self.logits,
labels=self.label))
self.loss = self.loss + self.reg_loss
def metrics_op(self):
# Call the parent to let OpenSearch track common system metrics
super(CustomModel, self).metrics_op()
def summary_op(self):
with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
for key, metric in self.metrics.items():
tf.summary.scalar(name=key, tensor=metric)
with tf.name_scope("{}_Layer_Summary".format(self.name)):
base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
base_ops.add_weight_summary(self.collections_dnn_hidden_layer)Configure feature columns
Access features through self.features using the feature name as the key. Configure all feature columns through the OpenSearch-provided API operations — using raw TensorFlow feature column functions directly risks incompatibility with the online inference service.
Use contrib.layers.input_from_feature_columns to embed features. Other embedding functions are not supported and will cause issues in the online service.
Supported feature column types
| Type | When to use |
|---|---|
embedding_column (generate_embedding_feature_column) | Sparse categorical features that need an embedding lookup |
real_valued_column (generate_real_valued_feature_column) | Continuous numeric features |
sparse_column_with_hash_bucket (generate_sparse_id_feature_column) | Sparse ID features without embedding |
shared_embedding_columns | When the same embedding must be shared across multiple features |
An embedding column cannot be used twice in the same model. If two features need to share an embedding, configure them with is_share_embedding=True and the same shared_embedding_name.
API operations
Use the following encapsulated API operations to configure feature columns:
# Embedding column — for sparse categorical features
self.generate_embedding_feature_column(
feature_name,
hash_bucket_size, # Number of hash buckets for the sparse feature
dimension, # Embedding dimension
initializer=tf.zeros_initializer,
combiner="sum", # How to combine multi-valued features: "sum", "mean", or "sqrtn"
is_share_embedding=False,
shared_embedding_name=None
)
# Real-valued column — for continuous numeric features
self.generate_real_valued_feature_column(
feature_name,
dtype="Float", # "Float" or "Int" only
value_dimension=1
)
# Sparse ID column — for sparse features without embedding
self.generate_sparse_id_feature_column(
feature_name,
hash_bucket_size,
combiner="sum"
)
# Retrieve configured feature columns by name
self.feature_columns_from_column_names(feature_list)Model specifications
Meet all of the following requirements to ensure compatibility with the online service:
| Attribute / method | Requirement |
|---|---|
CustomModel.__init__ | Call super(CustomModel, self).__init__(config, name) |
self.logits | Assign raw logits (pre-sigmoid scores). OpenSearch applies the sigmoid function automatically to convert them to the final ranking score. To use a different activation, override predictions_op. |
self.loss | Assign the total training loss |
self.reg_loss | Assign the regularization loss |
metrics_op | Call super(CustomModel, self).metrics_op() so OpenSearch can track system metrics |
Usage notes
Custom variables outside the contrib module
If you create variables with tf.Variable or use functions outside the contrib module, add those variables to the MODEL_VARIABLES collection. OpenSearch loads weights based on this collection.
Add only variables whose weights need to be loaded at serving time.
Do not add
global_stepto this collection.
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs
self._weights = vs.get_variable(
_WEIGHTS_VARIABLE_NAME,
[total_arg_size, output_size],
dtype=dtype,
initializer=kernel_initializer,
collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES]
)