EmbeddingVariable

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

使用EmbeddingVariable进行超大规模训练,不仅可以保证模型特征无损,而且可以节约内存资源。

警告

公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务

背景信息

Embedding已成为深度学习领域处理WordID类特征的有效途径。作为一种“函数映射”,Embedding通常将高维稀疏特征映射为低维稠密向量,再进行模型端到端训练。在TensorFlow中,使用Variable定义模型或节点状态,其实现依赖于数据结构Tensor。TensorTensorFlow的抽象数据类型,包括标量、向量、矩阵及更高维的数据结构。Tensor作为数据载体在各算子间流通,任何支持Tensor作为输入和输出的算子,都可以加入Graph的计算过程中。因为Tensor采用连续存储,所以定义Variable时,必须指定类型(Type)和空间大小(Shape),且该空间大小不支持修改。

TensorFlow通过Variable的方式实现Embedding机制,用于存储EmbeddingVariable空间大小为[vocabulary_size, embedding_dimension]。在大规模稀疏特征场景下,存在以下弊端:

  • vocabulary_size通常由ID空间决定,随着在线学习场景的ID不断增加,会导致vocabulary_size难以估计。

  • ID通常为字符串类型且规模庞大,进行Embedding之前,需要将其Hashvocabulary_size范围内:

    • 如果vocabulary_size过小,则导致Hash冲突率增加,不同特征可能查找到相同的Embedding,即特征减少。

    • 如果vocabulary_size过大,则导致Variable存储永远不会被查找的Embedding,即内存冗余。

  • Embedding变量过大是导致模型增大的主要原因,即使通过正则手段降低某些特征的Embedding对整个模型效果的影响,也无法从模型中去除该Embedding。

为解决上述问题,PAI-TF推出动态Embedding语义的EmbeddingVariable,在特征无损训练的条件下,以经济的方式使用内存资源,从而实现超大规模特征的离线训练和模型上线。PAI-TF提供EmbeddingVariable(3.1)及Feature_Column(3.3)API,推荐使用Feature_Column API,它可以自动增加字符串的Feature ID化流程。

EmbeddingVariable特性

  • 动态Embedding。

    无需指定Vocabulary规模,只需要指定Embedding Dim,PAI-TF就可以根据训练,动态地扩展或缩减词典大小。适用于在线学习场景,同时省略了TensorFlow模型的数据预处理流程。

  • Group Lasso正则。

    通常经过深度学习的Embedding变量规模超大,如果将其部署为在线服务,则会造成服务器压力。使用Group Lasso正则处理Embedding,可以减少模型部署的压力。

  • 支持将原始特征值传入Embedding Lookup,省略了HashID化操作,从而实现特征的无损训练。

  • 支持GraphInference、Back Propagation及变量的导入导出,模型训练中通过Optimizer自动更新EmbeddingVariable。

tf.get_embedding_variable接口说明

tf.get_embedding_variable接口返回一个已有的EmbeddingVariable变量或新建的EmbeddingVariable变量,接口定义如下。

get_embedding_variable(
    name,
    embedding_dim,
    key_dtype=dtypes.int64,
    value_dtype=None,
    initializer=None,
    trainable=True,
    collections=None,
    partitioner=None,
    custom_getter=None,
    steps_to_live=None,
    filter_options=variables.CounterFilterOptions()
)
  • name:Embedding Variable的名称。

  • embedding_dim:Embedding的维度。例如864。

  • key_dtype:Lookupkey的类型,默认值为int64。

  • value_dtype: EmbeddingVariable的类型,仅支持float类型。

  • initializer:EmbeddingVariable的初始值。

  • trainable:是否被添加到GraphKeys.TRAINABLE_VARIABLESCollection。

  • collections:一个记录了collectionkeys的列表,该Variable会被加入到列表中的collection,默认为[GraphKeys.GLOBAL_VARIABLES]。

  • partitioner:分区函数。

  • custom_getter:一个可调用的对象,将true getter作为它的第一个参数传入,并且允许覆盖内部的get_variable方法。应符合def custom_getter(getter,*args,**kwargs)的形式,也可以通过def custom_getter(getter,name,*args,**kwargs)的形式直接访问get_variable中的所有参数。

  • steps_to_live:全局步长,用于自动淘汰过期特征。系统会删除全局步数超过该参数值的未更新特征。

  • filter_options:准入策略,支持根据特征频次进行准入。

EmbeddingVariable

EmbeddingVariable的结构如下。

class EmbeddingVariable(ResourceVariable)

  def total_count():
    # 返回Embedding当前的total_count,[rowCount,EmbeddingDim]。
  def read_value():
    raise NotImplementedError("...")
  def assign():
    raise NotImplementedError("...")
  def assign_add():
    raise NotImplementedError("...")
  def assign_sub():
    raise NotImplementedError("...")
  • 支持读取稀疏数据的sparse_read()方法。如果查询的key不存在,则返回EmbeddingVariable初始化时,该key对应的initializer

  • 支持查询EmbeddingVariable词表总数的total_count()方法,该方法返回EmbeddingVariable的动态Shape值。

  • 不支持全量读取EmbeddingVariableread_value()方法。

  • 不支持EmbeddingVariable的赋值方法,包括assign()assign_add()assign_sub()

  • CounterFilterOptions的结构如下。

    	@tf_export("CounterFilterOptions")
    	class CounterFilterOptions(object):
    		def __init__(self, filter_freq=0):
    			pass

    指定准入频次,默认值为0。

使用feature_column接口构建EmbeddingVariable

def tf.contrib.layers.sparse_column_with_embedding(column_name=column_name,
                                                   dtype=tf.string,
                                                   partition_num=None,
                                                   steps_to_live=None,
                                                   # 1120版本不支持以下两个参数,仅140lite版本支持。
                                                   steps_to_live_l2reg=None,
                                                   l2reg_theta=None)

  # column_name: column name
  # dtype: type, default is tf.string

示例

  • 使用底层tf.get_embedding_variable接口构建包含EmbeddingVariableTensorFlow Graph。

    #!/usr/bin/python
    import tensorflow as tf
    
    var = tf.get_embedding_variable("var_0",
                                    embedding_dim=3,
                                    initializer=tf.ones_initializer(tf.float32),
                                    partitioner=tf.fixed_size_partitioner(num_shards=4))
    
    shape = [var1.total_count() for var1 in var]
    
    emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64))
    fun = tf.multiply(emb, 2.0, name='multiply')
    loss = tf.reduce_sum(fun, name='reduce_sum')
    opt = tf.train.FtrlOptimizer(0.1,
                                 l1_regularization_strength=2.0,
                                 l2_regularization_strength=0.00001)
    
    g_v = opt.compute_gradients(loss)
    train_op = opt.apply_gradients(g_v)
    
    init = tf.global_variables_initializer()
    
    sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
    with tf.Session(config=sess_config) as sess:
      sess.run([init])
      print(sess.run([emb, train_op, loss]))
      print(sess.run([emb, train_op, loss]))
      print(sess.run([emb, train_op, loss]))
      print(sess.run([shape]))
  • EmbeddingVariable保存为Checkpoint。

    #!/usr/bin/python
    import tensorflow as tf
    
    var = tf.get_embedding_variable("var_0",
                                    embedding_dim=3,
                                    initializer=tf.ones_initializer(tf.float32),
                                    partitioner=tf.fixed_size_partitioner(num_shards=4))
    
    emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64))
    
    init = tf.global_variables_initializer()
    saver = tf.train.Saver(sharded=True)
    print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES))
    print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS))
    
    checkpointDir = "/tmp/model_dir"
    sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
    with tf.Session(config=sess_config) as sess:
      sess.run([init])
      print(sess.run([emb]))
    
      save_path = saver.save(sess, checkpointDir + "/model.ckpt", global_step=666)
      tf.train.write_graph(sess.graph_def, checkpointDir, 'train.pbtxt')
      print("save_path", save_path)
      print("list_variables", tf.contrib.framework.list_variables(checkpointDir))
  • Checkpoint中恢复EmbeddingVariable变量。

    #!/usr/bin/python
    import tensorflow as tf
    
    var = tf.get_embedding_variable("var_0",
                                    embedding_dim=3,
                                    initializer=tf.ones_initializer(tf.float32),
                                    partitioner=tf.fixed_size_partitioner(num_shards=4))
    
    emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64))
    
    init = tf.global_variables_initializer()
    saver = tf.train.Saver(sharded=True)
    print("GLOBAL_VARIABLES: ", tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES))
    print("SAVEABLE_OBJECTS: ", tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS))
    
    checkpointDir = "/tmp/model_dir"
    sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
    with tf.Session(config=sess_config) as sess:
      print("list_variables", tf.contrib.framework.list_variables(checkpointDir))
      saver.restore(sess, checkpointDir + "/model.ckpt-666")
      print(sess.run([emb]))
  • 使用feature_column接口构建包含EmbeddingVariableTensorFlow Graph。

    import tensorflow as tf
    import os
    
    columns_list=[]
    columns_list.append(tf.contrib.layers.sparse_column_with_embedding(column_name="col_emb", dtype=tf.string))
    W = tf.contrib.layers.shared_embedding_columns(sparse_id_columns=columns_list,
            dimension=3,
            initializer=tf.ones_initializer(tf.float32),
            shared_embedding_name="xxxxx_shared")
    
    ids={}
    ids["col_emb"] = tf.SparseTensor(indices=[[0,0],[1,0],[2,0],[3,0],[4,0]], values=["aaaa","bbbbb","ccc","4nn","5b"], dense_shape=[5, 5])
    
    emb = tf.contrib.layers.input_from_feature_columns(columns_to_tensors=ids, feature_columns=W)
    
    fun = tf.multiply(emb, 2.0, name='multiply')
    loss = tf.reduce_sum(fun, name='reduce_sum')
    opt = tf.train.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001)
    g_v = opt.compute_gradients(loss)
    train_op = opt.apply_gradients(g_v)
    init = tf.global_variables_initializer()
    init_local = tf.local_variables_initializer()
    sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
    with tf.Session(config=sess_config) as sess:
      sess.run(init)
      print("init global done")
      sess.run(init_local)
      print("init local done")
      print(sess.run([emb, train_op,loss]))
      print(sess.run([emb, train_op,loss]))
      print(sess.run([emb, train_op,loss]))
      print(sess.run([emb]))
    说明

    EmbeddingVariable仅支持Ftrl Optimizer、Adagrad Optimizer、Adam OptimizerAdagradDecay Optimizer。