本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
您可以使用TableRecordDataset接口按照行读取MaxComepute表数据并构建数据流。
TensorFlow社区推荐在1.2及以上版本,使用Dataset接口代替线程和队列构建数据流。通过多个Dataset接口的组合变换生成计算数据,可以简化数据输入代码。
公共云GPU服务器即将过保下线,您可以继续提交CPU版本的TensorFlow任务。如需使用GPU进行模型训练,请前往DLC提交任务,具体操作请参见创建训练任务。
接口说明
PAI-TF提供的TableRecordDataset与原生TensorFlow RecordDataset相似,可以为数据变换(Transformation)的Dataset接口提供数据源。TableRecordDataset的接口定义如下。
class TableRecordDataset(Dataset):
def __init__(self,
filenames,
record_defaults,
selected_cols=None,
excluded_cols=None,
slice_id=0,
slice_count=1,
num_threads=0,
capacity=0):
参数 | 描述 |
filenames | 待读取的表名集合(列表),同一张表可以重复读取。 |
record_defaults | 待读取列的数据类型或列为空时的默认数据类型。如果该类型与实际读取的列类型不符,或数据类型无法自动转换,则执行过程中系统会抛出异常。系统支持的数据类型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING。 |
selected_cols | 选取的列,格式为英文逗号(,)分隔的字符串。 |
excluded_cols | 排除的列,格式为英文逗号(,)分隔的字符串。不能同时使用excluded_cols和selected_cols。 |
slice_id | 当前分区的编号。分布式读取时,系统根据slice_count将表平均分为多个分区,读取slice_id对应的分区。 |
slice_count | 分布式读取时,总的分区数量,通常为Worker数量。 |
num_threads | 预取数据时,每个访问表的内置Reader启用的线程(独立于计算线程)数量。取值范围为1~64。如果num_threads取值为0,则系统自动将新建的预取线程数配置为计算线程池线程数的1/4。 说明 因为I/O对每个模型的整体计算影响不同,所以提高预取线程数,不一定可以提升整体模型的训练速度。 |
capacity | 读取表的总预取量,单位为行数。如果num_threads大于1,则每个线程的预取量为capacity/num_threads行(向上取整)。如果capacity为0,则内置Reader根据所读表的前N行(系统默认N=256)平均值自动配置总预取量,使得每个线程的预取数据约占空间64 MB。 说明 如果手动配置预取量,当单线程的预取量大于1 GB,系统仅输出告警信息以提示您检查配置,而不会中断程序运行。 |
如果MaxCompute表字段为DOUBLE类型,则TensorFlow中需要使用np.float格式与其对应。
返回值
TableRecordDataset返回一个新的Dataset对象,可以作为Pipeline工作流构建的输入。
### other definition codes was ignored here.
# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
# (itemid bigint, name string, price double,
# virtual bool, tags string)
# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]
# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
record_defaults = (0, 0.0),
selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()
### Then we do other graph construction and finally run the graph.
执行Session时调用get_next()
方法,从表中读取128行数据,并根据record_defaults指定的类型将每列数据解析为对应类型的Tensor。其中get_next()
返回的output_types需要与record_defaults的参数类型相同,output_shapes的Tensor Shape需要与record_defaults的元素数量一致。
Console参数
如果将表作为输入,提交任务时,需要使用-Dtables配置待访问的表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
如果读取2张以上的表,则需要使用英文逗号(,)分隔多个表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
如果访问分区表,则需要在表名后添加分区。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;
示例
以逻辑回归(Logistic Regression)为例,介绍如何使用TableRecordDataset读取表数据并进行模型训练。
数据准备。
TableRecordReader是将整行数据作为一个字符串导入MaxCompute表,读取之后再进行解析。而使用TableRecordDataset时,建议MaxCompute数据表按照列存放相应的数据,Dataset接口会将表中的数据以指定类型的Tensor返回。
创建表。
使用MaxCompute创建一个包含四列数据的表。
odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double); Data Health Manager:Your health synthesize score is 5, so, your job priority is 7 ID = 201803050245351****6Vgsxo2 OK odps@ algo_platform_dev>read sample; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ +------------+------------+------------+------------+
导入数据。
下载测试数据,并使用MaxCompute Console Tunnel命令将其导入MaxCompute表。
#查看下载的测试数据。 $head -n 3 sample.csv 0,0,0.017179100152531324,1 0,1,0.823381420409002,1 0,2,1.6488850495540865,1
#将数据导入MaxCompute表。 odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,; Upload session: 20180305135640c8cc650a0000**** Start upload:sample.csv Using \n to split records Upload in strict schema mode: true Total bytes:260093 Split input to 1 blocks 2018-03-05 13:56:40 scan block: '1' 2018-03-05 13:56:40 scan block complete, blockid=1 2018-03-05 13:56:40 upload block: '1' 2018-03-05 13:56:41 upload block complete, blockid=1 upload complete, average speed is 254 KB/s OK odps@ algo_platform_dev>read sample 3; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ | 0.0 | 0.0 | 0.017179100152531324 | 1.0 | | 0.0 | 1.0 | 0.823381420409002 | 1.0 | | 0.0 | 2.0 | 1.6488850495540865 | 1.0 | +------------+------------+------------+------------+
说明因为该测试数据的每行内容使用英文逗号(,)分隔,所以使用
-fd=,
配置分隔符为英文逗号(,)才能将每行数据分为四列导入至相应的MaxCompute表。
构建输入数据和模型。
构建输入数据的示例代码如下。除无需定义tf.train.Coordinator和运行start_queue_runners以外,其余代码与使用TableRecordReader的代码相同。
#define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels
完整的示例代码lr_dataset.py如下。
import tensorflow as tf tf.app.flags.DEFINE_string("tables", "", "tables info") FLAGS = tf.app.flags.FLAGS #define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels #construct the model def model_fn(features, labels): W = tf.Variable(tf.zeros([3, 2])) b = tf.Variable(tf.zeros([2])) pred = tf.matmul(features, W) + b loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels)) # Gradient Descent optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss) return loss, optimizer features, labels = input_fn() loss, optimizer = model_fn(features, labels) init = tf.global_variables_initializer() local_init = tf.local_variables_initializer() sess = tf.Session() sess.run(init) sess.run(local_init) for step in range(10000): _, c = sess.run([optimizer, loss]) if step % 2000 == 0: print("loss," , c)
提交任务。
odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
查看执行结果。
单击提交任务返回的Logview链接,查看执行结果。
start launching tensorflow job ('loss,', 0.6931472) ('loss,', 0.007929571) ('loss,', 0.0016527221) ('loss,', 0.0023481336) ('loss,', 0.0011788738)