全部产品
云市场

PAI-TF数据IO方式介绍

更新时间:2020-02-25 11:55:16

PAI-TF支持读取OSS对象存储数据和MaxCompute表数据。

目录

读取OSS数据

(1)OSS上传数据说明

使用深度学习处理数据时,数据先存储到 OSS 的 Bucket 中。第一步要创建OSS Bucket。 GPU的计算集群区域和OSS所在区域需要相同。这样在数据传输时就可以使用阿里云经典网络,算法运行时不需要收取流量费用。Bucket 创建好之后,可以在OSS管理控制台 创建文件夹、组织数据目录、上传数据。

OSS 支持多种方式上传数据, API 或 SDK 详细见:https://help.aliyun.com/document_detail/31848.html?spm=5176.doc31848.6.580.a6es2a

OSS 还提供了大量的常用工具用来帮助用户更加高效的使用 OSS。工具列表请参见: https://help.aliyun.com/document_detail/44075.html?spm=5176.doc32184.6.1012.XlMMUx

建议您使用 ossutil 或 osscmd 这两个命令行工具,通过命令的方式来上传、下载文件,还支持断点续传。

注意:在使用工具时需要配置 AccessID 和 AccessKey,登录后,可以在Access Key 管理控制台创建或查看。

(2)读OSSBucket

用户在机器学习平台中使用“读OSS Bucket”组件时,需要授予一个名称为“AliyunODPSPAIDefaultRole” 的系统默认角色给数加的服务账号,当且仅当该角色被正确授权后,机器学习平台的算法才能正确地读、写OSS bucket。

注意:由于机器学习平台运行在MaxCompute框架之上,与MaxCompute共用服务账号。在授权时,默认的角色授予给MaxCompute服务账号。

RAM 授权可以使机器学习平台获得OSS的访问权限。在设置菜单完成对OSS读写权限的授权,详情见RAM授权。

(3)RAM 授权

  1. 进入机器学习控制台,单击左侧菜单栏的设置,选择基本设置
  2. OSS访问授权中勾选授权机器学习读取我的OSS中的数据
  3. 进入如下界面,单击点击前往RAM进行授权,进入RAM入口,如下图所示。

  4. 进入如下界面,单击同意授权

    注意:如果您想查看“AliyunODPSPAIDefaultRole”的相关详细策略信息,可以登录RAM控制台来查看。 默认角色“AliyunODPSPAIDefaultRole”包含的权限信息如下。

    权限名称(Action)权限说明
    oss:PutObject上传文件或文件夹对象
    oss:GetObject获取文件或文件夹对象
    oss:ListObjects查询文件列表信息
    oss:DeleteObjects删除对象
  5. 返回机器学习界面,单击刷新。RAM信息会自动录入组件中,如下图所示。

  1. 使用深度学习框架。将读OSSBucket组件与相应的深度学习组件连接,用来获得OSS的读写权限。

(4)TensorFlow读取OSS数据方法说明

低效的IO方式

本地执行TensorFlow代码和分布式云端执行TensorFlow的区别:

  • 本地读取数据:Server端直接从Client端获得graph进行计算。
  • 云端服务:Server在获得graph之后还需要将计算下发到各个Worker处理(具体原理可以参考视频教程-Tensorflow高级篇)。

本文档通过读取一个简单的CSV文件为例,帮您快速了解如何使用TensorFlow高效地读取数据。CSV文件如下:

  1. 1,1,1,1,1
  2. 2,2,2,2,2
  3. 3,3,3,3,3

容易产生问题的几个地方:

  • 不建议使用python本地读取文件的方式

    机器学习平台支持python的自带IO方式,但是需要将数据源和代码打包上传。这种读取方式是将数据写入内存之后再计算,效率比较低,不建议使用。示例代码如下。

    1. import csv
    2. csv_reader=csv.reader(open('csvtest.csv'))
    3. for row in csv_reader:
    4. print(row)
  • 不建议使用第三方库读取文件的方式

    通过第三方库(比如TFLearn、Panda)的一些数据IO的方式读取数据,是通过封装python的读取方式实现的,所以在机器学习平台使用时也会造成效率低下的问题。

  • 不建议使用 preload 读取文件的方式

    很多用户在使用机器学习服务的时候,发现 GPU 并没有比本地的 CPU 速度快的明显,主要问题可能就出在数据IO这块。
    preload 方式是先把数据全部都读到内存中,然后再通过 session 计算,比如feed的读取方式。这样要先进行数据读取,再计算,不同步造成性能浪费。同时因为内存限制也无法支持大数据量的计算。
    例如:假设硬盘中有一个图片数据集 0001.jpg,0002.jpg,0003.jpg,…… ,我们只需要把它们读取到内存中,然后提供给 GPU 或 CPU 计算就可以了。但并没有那么简单。事实上,我们必须把数据先读入后才能进行计算,假设读入用时0.1s,计算用时0.9s,那么就意味着每过1s,GPU都会有0.1s无事可做,这就大大降低了运算的效率。

高效的IO方式

高效的 TensorFlow 读取方式是将数据读取转换成 OP,通过 session run 的方式拉去数据。读取线程源源不断地将文件系统中的图片读入到一个内存的队列中,而负责计算的是另一个线程,计算需要数据时,直接从内存队列中取就可以了。这样就可以解决GPU因为IO而空闲的问题。

如下代码解释了如何在机器学习平台通过OP的方式读取数据。

  1. import argparse
  2. import tensorflow as tf
  3. import os
  4. FLAGS=None
  5. def main(_):
  6. dirname = os.path.join(FLAGS.buckets, "csvtest.csv")
  7. reader=tf.TextLineReader()
  8. filename_queue=tf.train.string_input_producer([dirname])
  9. key,value=reader.read(filename_queue)
  10. record_defaults=[[''],[''],[''],[''],['']]
  11. d1, d2, d3, d4, d5= tf.decode_csv(value, record_defaults, ',')
  12. init=tf.initialize_all_variables()
  13. with tf.Session() as sess:
  14. sess.run(init)
  15. coord = tf.train.Coordinator()
  16. threads = tf.train.start_queue_runners(sess=sess,coord=coord)
  17. for i in range(4):
  18. print(sess.run(d2))
  19. coord.request_stop()
  20. coord.join(threads)
  21. if __name__ == '__main__':
  22. parser = argparse.ArgumentParser()
  23. parser.add_argument('--buckets', type=str, default='',
  24. help='input data path')
  25. parser.add_argument('--checkpointDir', type=str, default='',
  26. help='output model path')
  27. FLAGS, _ = parser.parse_known_args()
  28. tf.app.run(main=main)
  • dirname:OSS文件路径,可以是数组,方便下一阶段 shuffle。
  • reader:TF内置各种reader API,可以根据需求选用。
  • tf.train.string_input_producer:将文件生成队列。
  • tf.decode_csv:是一个splite功能的OP,可以得到每一行的特定参数。
  • 通过OP获取数据,在session中需要tf.train.Coordinator()和tf.train.start_queue_runners(sess=sess,coord=coord)。

在代码中,输入得是3行5个字段:

  1. 1,1,1,1,1
  2. 2,2,2,2,2
  3. 3,3,3,3,3

循环输出4次,打印出第2个字段。结果如下图所示。

输出结果也证明了数据结构是成队列。

读取MaxCompute数据

PAI-Studio在支持OSS数据源的基础上,增加了对MaxCompute表的数据支持。用户可以直接使用PAI-Studio的Tensorflow组件读写MaxCompute数据,本教程将提供完整数据和代码供大家测试。

在开始实验之前,请先确保已开通了OSS存储服务用来存放训练代码。主账号如何开通和授权OSS,请参考:https://help.aliyun.com/document_detail/49571.html#h2-oss-4。

(1)详细流程

为了方便用户快速上手,本文档将以训练iris数据集为例,介绍如何跑通实验。

(2)读数据表组件

为了方便大家,我们提供了一份公共读的数据供大家测试,只要拖出读数据表组件,输入:

  1. pai_online_project.iris_data

即可获取数据,

数据格式如图:

(3)Tensorflow组件说明

3个输入桩从左到右分别是OSS输入、MaxCompute输入、模型输入。2个输出桩分别是模型输出、MaxCompute输出。如果输入是一个MaxCompute表,输出也是一个MaxCompute表,需要按下图方法连接。

读写MaxCompute表需要配置数据源、代码文件、输出模型路径、建表等操作。

  • Python代码文件:需要把执行代码放到OSS路径下(注意OSS需要与当前项目在同一区域),本文提供的代码可以在下方连接下载(代码需要按照下方代码说明文案调整):http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/129749/cn_zh/1565333220966/iristest.py?spm=a2c4g.11186623.2.10.50c46b36PlNwcq&file=iristest.py
  • Checkpoint输出目录/模型输入目录:选择自己的OSS路径用来存放模型
  • MaxCompute输出表:写MaxCompute表要求输出表是已经存在的表,并且输出的表名需要跟代码中的输出表名一致。在本案例中需要填写“iris_output”(不需要填写odps://xxx,仅需填写表名称)
  • 建表SQL语句:如果代码中的输出表并不存在,可以通过这个输入框输入建表语句自动建表。本案例中建表语句“create table iris_output(f1 DOUBLE,f2 DOUBLE,f3 DOUBLE,f4 DOUBLE,f5 STRING);”

组件PAI命令

  1. PAI -name tensorflow180_ext -project algo_public -Doutputs="odps://${当前项目名}/tables/${输出表名}" -DossHost="${OSS的host}" -Dtables="odps://${当前项目名}/tables/${输入表名}" -DgpuRequired="${GPU卡数}" -Darn="${OSS访问RoleARN}" -Dscript="${执行的代码文件}";

上述命令中的${}需要替换成用户真实数据

(4)代码说明

  1. import tensorflow as tf
  2. tf.app.flags.DEFINE_string("tables", "", "tables info")
  3. FLAGS = tf.app.flags.FLAGS
  4. print("tables:" + FLAGS.tables)
  5. tables = [FLAGS.tables]
  6. filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
  7. reader = tf.TableRecordReader()
  8. key, value = reader.read(filename_queue)
  9. record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]
  10. col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults)
  11. # line 9 and 10 can be written like below for short. It will be helpful when too many columns exist.
  12. # record_defaults = [[1.0]] * 4 + [["Iris-virginica"]]
  13. # value_list = tf.decode_csv(value, record_defaults = record_defaults)
  14. writer = tf.TableRecordWriter("odps://{MaxCompute项目名}/tables/iris_output")
  15. write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
  16. # line 16 can be written like below for short. It will be helpful when too many columns exist.
  17. # write_to_table = writer.write(range(5), value_list)
  18. close_table = writer.close()
  19. init = tf.global_variables_initializer()
  20. with tf.Session() as sess:
  21. sess.run(init)
  22. sess.run(tf.local_variables_initializer())
  23. coord = tf.train.Coordinator()
  24. threads = tf.train.start_queue_runners(coord=coord)
  25. try:
  26. step = 0
  27. while not coord.should_stop():
  28. step += 1
  29. sess.run(write_to_table)
  30. except tf.errors.OutOfRangeError:
  31. print('%d records copied' % step)
  32. finally:
  33. sess.run(close_table)
  34. coord.request_stop()
  35. coord.join(threads)

(5)读数据表

  1. tables = [FLAGS.tables]
  2. filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
  3. reader = tf.TableRecordReader()
  4. key, value = reader.read(filename_queue)
  5. record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]

其中FLAGS.tables是前端配置的输入表名的传参变量,对应组件的MaxCompute输入桩:

在数据量较大时,尽量采用批量操作,减少op的执行次数。

(6)写数据表

  1. writer = tf.TableRecordWriter("odps://{MaxCompute项目名}/tables/iris_output")
  2. write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
  • TableRecordWriter中的格式为”odps://当前项目名/tables/输出表名”
  • 读写分区表写法:”odps://当前项目名/tables/输出表名/pt=1”