本文为您介绍PAI-TF数据转换方法。

trans_csv_id2sparse Python接口

将标记有效位置的CSV字符串集合转换成为稀疏矩阵。
trans_csv_id2sparse(records, max_id, id_as_value=True, field_delim=”,”)
  • 输入以下参数:
    参数 是否必选 描述
    records 类型STRING数组待解析CSV字符串数组(列表),以CSV格式由分隔符分隔。
    max_id 类型INT64稀疏矩阵的最大列数,用于设定输出中的dense_shape值。如果实际ID大于或等于dense_shape值,则报错。
    id_as_value 类型BOOL,默认为True,将Index编号作为稀疏矩阵中有效点的值,类型为INT64。无特殊情况不建议更改为False。
    field_delim 类型STRING,默认为英文逗号(,)。CSV格式数据的分隔符。不支持数字、正负号、字母e和E、小数点(.)和多字节分隔符。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
  • 输出:由Index CSV序列转换而得的Sparse Tensor,Value类型为INT64。
示例:将一个batch的存有Index数据的STRING转换成一个sparse tensor。
  • 输入:
    [“2,10”,“7”,“0,8”]
  • 需求:

    矩阵列宽度为20,有效点填入原Index。

  • 代码:
    outsparse = tf.trans_csv_id2sparse([“2,10”,“7”,“0,8”], 20)
  • 返回结果:
    SparseTensor(
    indices=[[0,2],[0,10],[1,7],[2,0],[2,8]],
    values=[2, 10, 7, 0, 8],
    dense_shape=[3,20])

trans_csv_kv2dense Python接口

将以Key/Value形式标记有效位置和值的CSV字符串集合转换成为稠密矩阵。
trans_csv_kv2dense(records, max_id, field_delim=”,”)
  • 输入以下参数:
    参数 是否必选 描述
    records 类型STRING数组待解析CSV字符串数组(列表),以CSV格式由分隔符分隔。每一个数据项均为以冒号(:)分隔的Key/Value形式数据,否则报错。
    max_id 类型INT64输出的稠密矩阵的列数。如果实际ID大于或等于列数,则报错。
    field_delim 类型STRING,默认为英文逗号(,)。CSV格式数据的分隔符。不支持数字、正负号、字母e和E、小数点(.)和多字节分隔符。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
  • 输出:由Key/Value形式CSV序列转换而得的稠密矩阵,默认输出类型为FLOAT,空白处以0.0填充。
示例:将一个batch以Key/Value形式 Index: Value存储的STRING转换成为一个稠密矩阵。
  • 输入:
    [“1:0.1,2:0.2,4:0.4,10:1.0”,
    “0:0.22,3:0.33,9:0.99”,
    “2:0.24,7:0.84,8:0.96” ]
  • 需求:

    列宽设置为12。

  • 代码:
    outmatrix = tf.trans_csv_kv2dense(
    [“1:0.1,2:0.2,4:0.4,10:1.0”,
     “0:0.22,3:0.33,9:0.99”,
     “2:0.24,7:0.84,8:0.96” ] , 12)
  • 返回结果:
    [[0.0, 0.1, 0.2, 0.0, 0.4, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0]
    [0.22, 0.0, 0.0, 0.33, 0.0, 0.0, 0.0, 0.0, 0.0, 0.99, 0.0, 0.0]
    [0.0, 0.0, 0.24, 0.0, 0.0, 0.0, 0.0, 0.84, 0.96, 0.0, 0.0, 0.0]]

trans_csv_kv2sparse Python接口

将以Key/Value形式标记有效位置和值的CSV字符串集合转换成为稀疏矩阵。
trans_csv_kv2sparse(records, max_id, field_delim=”,”)
  • 输入以下参数:
    参数 是否必选 描述
    records 类型STRING数组待解析CSV字符串数组(列表),以CSV格式由分隔符分隔。每一个数据项均为以冒号(:)分隔的Key/Value形式数据,否则报错。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
    max_id 类型INT64稀疏矩阵的最大列数,用于设定输出中的dense_shape值。如果实际ID大于或等于dense_shape值,则报错。
    field_delim 类型STRING,默认为英文逗号(,)。CSV格式数据的分隔符。不支持数字、正负号、字母e和E、小数点(.)和多字节分隔符。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
  • 输出:由Key/Value形式CSV序列转换而得的稀疏矩阵,默认输出类型为FLOAT。
示例:将一个batch以Key/Value形式 Index: Value存储的STRING转换成为一个稀疏矩阵。
  • 输入:
    [“1:0.1,2:0.2,4:0.4,10:1.0”,
    “0:0.22,3:0.33,9:0.99”,
    “2:0.24,7:0.84,8:0.96” ]
  • 需求:

    列宽设置为20,生成稀疏矩阵Tensor。

  • 代码:
    outsparse = tf.trans_csv_kv2sparse(
    [“1:0.1,2:0.2,4:0.4,10:1.0”,
     “0:0.22,3:0.33,9:0.99”,
     “2:0.24,7:0.84,8:0.96” ] , 20)
  • 返回结果:
    SparseTensor(
    indices=[[0,1],[0,2],[0,4],[0,10],[1,0],[1,3],[1,9],[2,0],[2,7],[2,8]],
    values=[0.1, 0.2, 0.4, 1.0, 0.22, 0.33, 0.99, 0.24, 0.84, 0.96],
    dense_shape=[3,20])

trans_csv_id2dense Python接口

将标记有效位置的CSV字符串集合转换成为稠密矩阵。
trans_csv_id2dense(records, max_id, id_as_value=False, field_delim=”,”)
  • 输入以下参数:
    参数 是否必选 描述
    records 类型STRING数组待解析CSV字符串数组(列表),以CSV格式由分隔符分隔。
    max_id 类型INT64输出的稠密矩阵的列数。如果实际ID大于或等于列数,则报错。
    id_as_value 类型BOOL,默认为False,稀疏矩阵中有效点的值将会填入int64(1)。
    field_delim 类型STRING,默认为英文逗号(,)。CSV格式数据的分隔符。不支持数字、正负号、字母e和E、小数点(.)和多字节分隔符。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
  • 输出:由Index CSV序列转换而得的稠密矩阵,类型为INT64,空白处以0值填充。
示例:将一个batch的存有Index数据的STRING转换成一个稠密矩阵:
  • 输入:
    [“2,10”,“7”,“0,8”]
  • 需求:

    需求:列宽设置为12,有效点填入1。

  • 代码:
    outmatrix = tf.trans_csv_id2dense(
    [“2,10”,“7”,“0,8”], 12)
  • 返回结果:
    [[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0]
    [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]
    [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]]

trans_csv_to_dense Python接口

将由数值组成的CSV字符串集合转换成为稠密矩阵。
trans_csv_to_dense(records, max_id, field_delim=”,”)
  • 输入以下参数:
    参数 是否必选 描述
    records 类型STRING数组待解析CSV字符串数组(列表),以CSV格式由分隔符分隔。
    max_id 类型INT64输出的稠密矩阵的列数。如果实际CSV字符串的列数大于或等于列数,则报错。
    field_delim 类型STRING,默认为英文逗号(,)。CSV格式数据的分隔符。不支持数字、正负号、字母e和E、小数点(.)和多字节分隔符。当使用空格作为分隔符时,多个连续空格将被视作一个分隔符。
  • 输出:由Key/Value形式CSV序列转换而得的稠密矩阵,默认输出类型为FLOAT,空白处以0.0填充。
示例:将一个batch的CSV格式STRING转换成为一个稠密矩阵。
  • 输入:
    [“0.1,0.2,0.4,1.0”,
    “0.22,0.33,0.99”,
    “0.24,0.84,0.96” ]
  • 需求:

    列宽设置为6。

  • 代码:
    outmatrix = tf.trans_csv_to_dense(
    [“0.1,0.2,0.4,1.0”,
     “0.22,0.33,0.99”,
     “0.24,0.84,0.96” ] , 6)
  • 返回结果:
    [[0.1, 0.2, 0.4, 1.0, 0.0, 0.0]
    [0.22, 0.33, 0.99, 0.0, 0.0, 0.0]
    [0.24, 0.84, 0.96, 0.0, 0.0, 0.0]]

代码实例

以下代码通过TensorFlow从存放在ODPS的数据表中读取数据。数据共有6列,第1列为ID,第2列为Key/Value格式的CSV数据,后4列为Index格式的CSV数据。数据读取后调用TransCSV的ODPS,将这5列数据分别转换为1个稠密矩阵和4个稀疏矩阵,用于模型训练。
import tensorflow as tf
import numpy as np
def read_table(filename_queue):
    batch_size = 128
    reader = tf.TableRecordReader(csv_delimiter=';', num_threads=8, capacity=8*batch_size)
    key, value = reader.read_up_to(filename_queue, batch_size)
    values = tf.train.batch([value], batch_size=batch_size, capacity=8*capacity, enqueue_many=True, num_threads=8)
    record_defaults = [[1.0], [""], [""], [""], [""], [""]]
    feature_size = [1322,30185604,43239874,5758226,41900998]
    col1, col2, col3, col4, col5, col6 = tf.decode_csv(values, record_defaults=record_defaults, field_delim=';')
    col2 = tf.trans_csv_kv2dense(col2, feature_size[0])
    col3 = tf.trans_csv_id2sparse(col3, feature_size[1])
    col4 = tf.trans_csv_id2sparse(col4, feature_size[2])
    col5 = tf.trans_csv_id2sparse(col5, feature_size[3])
    col6 = tf.trans_csv_id2sparse(col6, feature_size[4])
    return [col1, col2, col3, col4, col5, col6]
if __name__ == '__main__':
    tf.app.flags.DEFINE_string("tables", "", "tables")
    tf.app.flags.DEFINE_integer("num_epochs", 1000, "number of epoches")
    FLAGS = tf.app.flags.FLAGS
    table_pattern = FLAGS.tables
    num_epochs = FLAGS.num_epochs
    filename_queue = tf.train.string_input_producer(table_pattern, num_epochs)
    train_data = read_table(filename_queue)
    init_global = tf.global_variables_initializer()
    init_local = tf.local_variables_initializer()
    with tf.Session() as sess:
      sess.run(init_global)
      sess.run(init_local)
      coord = tf.train.Coordinator()
      threads = tf.train.start_queue_runners(sess=sess, coord=coord)
      for i in range(1000):
        sess.run(train_data)
      coord.request_stop()
      coord.join(threads)