文档

TableRecordDataset

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

您可以使用TableRecordDataset接口按照行读取MaxComepute表数据并构建数据流。

TensorFlow社区推荐在1.2及以上版本,使用Dataset接口代替线程和队列构建数据流。通过多个Dataset接口的组合变换生成计算数据,可以简化数据输入代码。

警告

公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务

接口说明

PAI-TF提供的TableRecordDataset与原生TensorFlow RecordDataset相似,可以为数据变换(Transformation)的Dataset接口提供数据源。TableRecordDataset的接口定义如下。

class TableRecordDataset(Dataset):
  def __init__(self,
               filenames,
               record_defaults,
               selected_cols=None,
               excluded_cols=None,
                slice_id=0,
                slice_count=1,
               num_threads=0,
               capacity=0):

参数

描述

filenames

待读取的表名集合(列表),同一张表可以重复读取。

record_defaults

待读取列的数据类型或列为空时的默认数据类型。如果该类型与实际读取的列类型不符,或数据类型无法自动转换,则执行过程中系统会抛出异常。系统支持的数据类型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING。

selected_cols

选取的列,格式为英文逗号(,)分隔的字符串。

excluded_cols

排除的列,格式为英文逗号(,)分隔的字符串。不能同时使用excluded_colsselected_cols

slice_id

当前分区的编号。分布式读取时,系统根据slice_count将表平均分为多个分区,读取slice_id对应的分区。

slice_count

分布式读取时,总的分区数量,通常为Worker数量。

num_threads

预取数据时,每个访问表的内置Reader启用的线程(独立于计算线程)数量。取值范围为1~64。如果num_threads取值为0,则系统自动将新建的预取线程数配置为计算线程池线程数的1/4。

说明

因为I/O对每个模型的整体计算影响不同,所以提高预取线程数,不一定可以提升整体模型的训练速度。

capacity

读取表的总预取量,单位为行数。如果num_threads大于1,则每个线程的预取量为capacity/num_threads行(向上取整)。如果capacity0,则内置Reader根据所读表的前N行(系统默认N=256)平均值自动配置总预取量,使得每个线程的预取数据约占空间64 MB。

说明

如果手动配置预取量,当单线程的预取量大于1 GB,系统仅输出告警信息以提示您检查配置,而不会中断程序运行。

说明

如果MaxCompute表字段为DOUBLE类型,则TensorFlow中需要使用np.float格式与其对应。

返回值

TableRecordDataset返回一个新的Dataset对象,可以作为Pipeline工作流构建的输入。

### other definition codes was ignored here.

# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
#   (itemid bigint, name string, price double,
#    virtual bool, tags string)

# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]

# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
                                     record_defaults = (0, 0.0),
                                     selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()

### Then we do other graph construction and finally run the graph.

执行Session时调用get_next()方法,从表中读取128行数据,并根据record_defaults指定的类型将每列数据解析为对应类型的Tensor。其中get_next()返回的output_types需要与record_defaults的参数类型相同,output_shapes的Tensor Shape需要与record_defaults的元素数量一致。

Console参数

  • 如果将表作为输入,提交任务时,需要使用-Dtables配置待访问的表名。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
  • 如果读取2张以上的表,则需要使用英文逗号(,)分隔多个表名。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
  • 如果访问分区表,则需要在表名后添加分区。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;

示例

以逻辑回归(Logistic Regression)为例,介绍如何使用TableRecordDataset读取表数据并进行模型训练。

  1. 数据准备。

    TableRecordReader是将整行数据作为一个字符串导入MaxCompute表,读取之后再进行解析。而使用TableRecordDataset时,建议MaxCompute数据表按照列存放相应的数据,Dataset接口会将表中的数据以指定类型的Tensor返回。

    1. 创建表。

      使用MaxCompute创建一个包含四列数据的表。

      odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double);
      Data Health Manager:Your health synthesize score is 5, so, your job priority is 7
      
      ID = 201803050245351****6Vgsxo2
      OK
      odps@ algo_platform_dev>read sample;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      +------------+------------+------------+------------+
    2. 导入数据。

      下载测试数据,并使用MaxCompute Console Tunnel命令将其导入MaxCompute表。

      #查看下载的测试数据。
      $head -n 3 sample.csv
      0,0,0.017179100152531324,1
      0,1,0.823381420409002,1
      0,2,1.6488850495540865,1
      #将数据导入MaxCompute表。
      odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,;
      Upload session: 20180305135640c8cc650a0000****
      Start upload:sample.csv
      Using \n to split records
      Upload in strict schema mode: true
      Total bytes:260093   Split input to 1 blocks
      2018-03-05 13:56:40 scan block: '1'
      2018-03-05 13:56:40 scan block complete, blockid=1
      2018-03-05 13:56:40 upload block: '1'
      2018-03-05 13:56:41 upload block complete, blockid=1
      upload complete, average speed is 254 KB/s
      OK
      odps@ algo_platform_dev>read sample 3;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      | 0.0        | 0.0        | 0.017179100152531324 | 1.0        |
      | 0.0        | 1.0        | 0.823381420409002 | 1.0        |
      | 0.0        | 2.0        | 1.6488850495540865 | 1.0        |
      +------------+------------+------------+------------+
      说明

      因为该测试数据的每行内容使用英文逗号(,)分隔,所以使用-fd=,配置分隔符为英文逗号(,)才能将每行数据分为四列导入至相应的MaxCompute表。

  2. 构建输入数据和模型。

    构建输入数据的示例代码如下。除无需定义tf.train.Coordinator和运行start_queue_runners以外,其余代码与使用TableRecordReader的代码相同。

    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels

    完整的示例代码lr_dataset.py如下。

    import tensorflow as tf
    
    tf.app.flags.DEFINE_string("tables", "", "tables info")
    FLAGS = tf.app.flags.FLAGS
    
    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels
    
    #construct the model
    def model_fn(features, labels):
        W = tf.Variable(tf.zeros([3, 2]))
        b = tf.Variable(tf.zeros([2]))
        pred = tf.matmul(features, W) + b
    
        loss =  tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels))
    
        # Gradient Descent
        optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss)
        return loss, optimizer
    
    features, labels = input_fn()
    loss, optimizer = model_fn(features, labels)
    
    init = tf.global_variables_initializer()
    local_init = tf.local_variables_initializer()
    
    sess = tf.Session()
    sess.run(init)
    sess.run(local_init)
    
    for step in range(10000):
        _, c = sess.run([optimizer, loss])
        if step % 2000 == 0:
            print("loss," , c)
  3. 提交任务。

    odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
  4. 查看执行结果。

    单击提交任务返回的Logview链接,查看执行结果。

    start launching tensorflow job
    ('loss,', 0.6931472)
    ('loss,', 0.007929571)
    ('loss,', 0.0016527221)
    ('loss,', 0.0023481336)
    ('loss,', 0.0011788738)