本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
使用EmbeddingVariable进行超大规模训练,不仅可以保证模型特征无损,而且可以节约内存资源。
公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务。
背景信息
Embedding已成为深度学习领域处理Word及ID类特征的有效途径。作为一种“函数映射”,Embedding通常将高维稀疏特征映射为低维稠密向量,再进行模型端到端训练。在TensorFlow中,使用Variable定义模型或节点状态,其实现依赖于数据结构Tensor。Tensor是TensorFlow的抽象数据类型,包括标量、向量、矩阵及更高维的数据结构。Tensor作为数据载体在各算子间流通,任何支持Tensor作为输入和输出的算子,都可以加入Graph的计算过程中。因为Tensor采用连续存储,所以定义Variable时,必须指定类型(Type)和空间大小(Shape),且该空间大小不支持修改。
TensorFlow通过Variable的方式实现Embedding机制,用于存储Embedding的Variable空间大小为[vocabulary_size, embedding_dimension]。在大规模稀疏特征场景下,存在以下弊端:
vocabulary_size通常由ID空间决定,随着在线学习场景的ID不断增加,会导致vocabulary_size难以估计。
ID通常为字符串类型且规模庞大,进行Embedding之前,需要将其Hash到vocabulary_size范围内:
如果vocabulary_size过小,则导致Hash冲突率增加,不同特征可能查找到相同的Embedding,即特征减少。
如果vocabulary_size过大,则导致Variable存储永远不会被查找的Embedding,即内存冗余。
Embedding变量过大是导致模型增大的主要原因,即使通过正则手段降低某些特征的Embedding对整个模型效果的影响,也无法从模型中去除该Embedding。
为解决上述问题,PAI-TF推出动态Embedding语义的EmbeddingVariable,在特征无损训练的条件下,以经济的方式使用内存资源,从而实现超大规模特征的离线训练和模型上线。PAI-TF提供EmbeddingVariable(3.1)及Feature_Column(3.3)API,推荐使用Feature_Column API,它可以自动增加字符串的Feature ID化流程。
EmbeddingVariable特性
动态Embedding。
无需指定Vocabulary规模,只需要指定Embedding Dim,PAI-TF就可以根据训练,动态地扩展或缩减词典大小。适用于在线学习场景,同时省略了TensorFlow模型的数据预处理流程。
Group Lasso正则。
通常经过深度学习的Embedding变量规模超大,如果将其部署为在线服务,则会造成服务器压力。使用Group Lasso正则处理Embedding,可以减少模型部署的压力。
支持将原始特征值传入Embedding Lookup,省略了Hash等ID化操作,从而实现特征的无损训练。
支持Graph的Inference、Back Propagation及变量的导入导出,模型训练中通过Optimizer自动更新EmbeddingVariable。
tf.get_embedding_variable接口说明
tf.get_embedding_variable接口返回一个已有的EmbeddingVariable变量或新建的EmbeddingVariable变量,接口定义如下。
get_embedding_variable(
name,
embedding_dim,
key_dtype=dtypes.int64,
value_dtype=None,
initializer=None,
trainable=True,
collections=None,
partitioner=None,
custom_getter=None,
steps_to_live=None,
filter_options=variables.CounterFilterOptions()
)
name:Embedding Variable的名称。
embedding_dim:Embedding的维度。例如8或64。
key_dtype:Lookup时key的类型,默认值为int64。
value_dtype: EmbeddingVariable的类型,仅支持float类型。
initializer:EmbeddingVariable的初始值。
trainable:是否被添加到GraphKeys.TRAINABLE_VARIABLES的Collection。
collections:一个记录了collection的keys的列表,该Variable会被加入到列表中的collection,默认为[GraphKeys.GLOBAL_VARIABLES]。
partitioner:分区函数。
custom_getter:一个可调用的对象,将true getter作为它的第一个参数传入,并且允许覆盖内部的
get_variable
方法。应符合def custom_getter(getter,*args,**kwargs)
的形式,也可以通过def custom_getter(getter,name,*args,**kwargs)
的形式直接访问get_variable
中的所有参数。steps_to_live:全局步长,用于自动淘汰过期特征。系统会删除全局步数超过该参数值的未更新特征。
filter_options:准入策略,支持根据特征频次进行准入。
EmbeddingVariable
EmbeddingVariable的结构如下。
class EmbeddingVariable(ResourceVariable)
def total_count():
# 返回Embedding当前的total_count,[rowCount,EmbeddingDim]。
def read_value():
raise NotImplementedError("...")
def assign():
raise NotImplementedError("...")
def assign_add():
raise NotImplementedError("...")
def assign_sub():
raise NotImplementedError("...")
支持读取稀疏数据的
sparse_read()
方法。如果查询的key不存在,则返回EmbeddingVariable初始化时,该key对应的initializer。支持查询EmbeddingVariable词表总数的
total_count()
方法,该方法返回EmbeddingVariable的动态Shape值。不支持全量读取EmbeddingVariable的
read_value()
方法。不支持EmbeddingVariable的赋值方法,包括
assign()
、assign_add()
及assign_sub()
。CounterFilterOptions的结构如下。
@tf_export("CounterFilterOptions") class CounterFilterOptions(object): def __init__(self, filter_freq=0): pass
指定准入频次,默认值为0。
使用feature_column接口构建EmbeddingVariable
def tf.contrib.layers.sparse_column_with_embedding(column_name=column_name,
dtype=tf.string,
partition_num=None,
steps_to_live=None,
# 1120版本不支持以下两个参数,仅140lite版本支持。
steps_to_live_l2reg=None,
l2reg_theta=None)
# column_name: column name
# dtype: type, default is tf.string
示例
使用底层tf.get_embedding_variable接口构建包含EmbeddingVariable的TensorFlow Graph。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) shape = [var1.total_count() for var1 in var] emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) fun = tf.multiply(emb, 2.0, name='multiply') loss = tf.reduce_sum(fun, name='reduce_sum') opt = tf.train.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001) g_v = opt.compute_gradients(loss) train_op = opt.apply_gradients(g_v) init = tf.global_variables_initializer() sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run([init]) print(sess.run([emb, train_op, loss])) print(sess.run([emb, train_op, loss])) print(sess.run([emb, train_op, loss])) print(sess.run([shape]))
将EmbeddingVariable保存为Checkpoint。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) init = tf.global_variables_initializer() saver = tf.train.Saver(sharded=True) print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)) print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS)) checkpointDir = "/tmp/model_dir" sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run([init]) print(sess.run([emb])) save_path = saver.save(sess, checkpointDir + "/model.ckpt", global_step=666) tf.train.write_graph(sess.graph_def, checkpointDir, 'train.pbtxt') print("save_path", save_path) print("list_variables", tf.contrib.framework.list_variables(checkpointDir))
从Checkpoint中恢复EmbeddingVariable变量。
#!/usr/bin/python import tensorflow as tf var = tf.get_embedding_variable("var_0", embedding_dim=3, initializer=tf.ones_initializer(tf.float32), partitioner=tf.fixed_size_partitioner(num_shards=4)) emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64)) init = tf.global_variables_initializer() saver = tf.train.Saver(sharded=True) print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)) print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS)) checkpointDir = "/tmp/model_dir" sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: print("list_variables", tf.contrib.framework.list_variables(checkpointDir)) saver.restore(sess, checkpointDir + "/model.ckpt-666") print(sess.run([emb]))
使用feature_column接口构建包含EmbeddingVariable的TensorFlow Graph。
import tensorflow as tf import os columns_list=[] columns_list.append(tf.contrib.layers.sparse_column_with_embedding(column_name="col_emb", dtype=tf.string)) W = tf.contrib.layers.shared_embedding_columns(sparse_id_columns=columns_list, dimension=3, initializer=tf.ones_initializer(tf.float32), shared_embedding_name="xxxxx_shared") ids={} ids["col_emb"] = tf.SparseTensor(indices=[[0,0],[1,0],[2,0],[3,0],[4,0]], values=["aaaa","bbbbb","ccc","4nn","5b"], dense_shape=[5, 5]) emb = tf.contrib.layers.input_from_feature_columns(columns_to_tensors=ids, feature_columns=W) fun = tf.multiply(emb, 2.0, name='multiply') loss = tf.reduce_sum(fun, name='reduce_sum') opt = tf.train.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001) g_v = opt.compute_gradients(loss) train_op = opt.apply_gradients(g_v) init = tf.global_variables_initializer() init_local = tf.local_variables_initializer() sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) with tf.Session(config=sess_config) as sess: sess.run(init) print("init global done") sess.run(init_local) print("init local done") print(sess.run([emb, train_op,loss])) print(sess.run([emb, train_op,loss])) print(sess.run([emb, train_op,loss])) print(sess.run([emb]))
说明EmbeddingVariable仅支持Ftrl Optimizer、Adagrad Optimizer、Adam Optimizer及AdagradDecay Optimizer。