PAI-TensorFlow支持读取OSS对象存储数据和MaxCompute表数据。

读取OSS数据

主流程 描述
上传数据至OSS

使用深度学习处理数据时,数据需要先存储到OSS的Bucket中。

  1. 创建OSS Bucket。
    创建的OSS Bucket的区域需与GPU的计算集群区域相同。数据传输时即可使用阿里经典网络,算法运行时不需要收取流量费用。
    注意 不要开通OSS版本控制功能。
  2. 创建文件夹、组织数据目录、上传数据。

    登录OSS控制台,创建文件夹、组织数据目录和上传数据。

OSS授权 当您在机器学习平台中读写OSS Bucket组件时,需要授予AliyunODPSPAIDefaultRole系统默认角色给数加服务账号,详情请参见授权
RAM授权

RAM授权可以使机器学习平台获得OSS的访问权限

获取机器学习平台访问OSS的权限:
  1. 在PAI-Studio项目空间,单击左侧菜单栏的设置,选择基本设置
  2. 基本设置页面的OSS访问授权区域,选中授权机器学习读取我的OSS中的数据复选框,其他参数采用默认配置。
  3. 确认对话框,单击点击前往RAM进行授权
  4. 单击同意授权tew
  5. 单击读OSS数据组件字段设置下的刷新,查看RAM信息自动录入组件。teshy
TensorFlow读取OSS数据 连接组件读OSS数据TensorFlow
默认角色AliyunODPSPAIDefaultRole包含权限信息如下。
权限名称(Action) 权限说明
oss:PutObject 上传文件或文件夹对象
oss:GetObject 获取文件或文件夹对象
oss:ListObjects 查询文件列表信息
oss:DeleteObjects 删除对象
TensorFlow读取OSS数据方法:
  • 低效的IO方式
    本地执行TensorFlow代码和分布式云端执行TensorFlow的区别:
    • 本地读取数据:Server端直接从Client端获取Graph进行计算。
    • 云端服务:Server获得Graph后,还需要将计算下发至各Worker处理。
    reat
    注意事项
    • 不建议使用Python本地读取文件的方式。
      机器学习平台支持Python自带IO方式,但需要将数据源和代码打包上传。这种读取方式是将数据写入内存之后再计算,效率比较低,不建议使用。示例代码如下。
      import csv
      csv_reader=csv.reader(open('csvtest.csv'))
      for row in csv_reader:
        print(row)
    • 不建议使用第三方库读取文件的方式。

      使用第三方库(如TFLearn、Panda)的数据IO方式读取数据。通常,第三方库是通过封装Python的读取方式实现,所以在机器学习平台使用时也会造成效率低下的问题。

    • 不建议使用Preload读取文件的方式。

      如果您发现GPU并没有比本地的CPU速度快很多,则有可能是数据IO,导致性能浪费。Preload读取方式先将数据全部都读到内存中,然后再通过Session计算,例如Feed读取方式。这样造成性能浪费,同时内存限制也无法计算大数据。

      例如,硬盘中有图片数据集0001.jpg,0002.jpg,0003.jpg,…… 。先读取数据后再计算,假设读入用时0.1s,计算用时0.9s,这样每过1s,GPU都会有0.1s空闲,降低了运算的效率。tewata
  • 高效的IO方式
    TensorFlow读取方式是将数据读取转换成OP,通过Session.run的方式读取数据。读取线程将文件系统中的图片读入到一个内存的队列中。计算是另一个线程,并直接从内存队列中读取进行计算。这样解决了GPU因IO空闲而浪费性能的问题。test
    在机器学习平台通过OP的方式读取数据的代码如下。
    import argparse
    import tensorflow as tf
    import os
    FLAGS=None
    def main(_):
        dirname = os.path.join(FLAGS.buckets, "csvtest.csv")
        reader=tf.TextLineReader()
        filename_queue=tf.train.string_input_producer([dirname])
        key,value=reader.read(filename_queue)
        record_defaults=[[''],[''],[''],[''],['']]
        d1, d2, d3, d4, d5= tf.decode_csv(value, record_defaults, ',')
        init=tf.initialize_all_variables()
        with tf.Session() as sess:
            sess.run(init)
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(sess=sess,coord=coord)
            for i in range(4):
                print(sess.run(d2))
            coord.request_stop()
            coord.join(threads)
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument('--buckets', type=str, default='',
                            help='input data path')
        parser.add_argument('--checkpointDir', type=str, default='',
                            help='output model path')
        FLAGS, _ = parser.parse_known_args()
        tf.app.run(main=main)
    其中:
    • dirname:OSS文件路径,可以是数组。
    • reader:Tensorflow内置各种类型reader API,可以根据需求选用。
    • tf.train.string_input_producer:将文件生成队列。
    • tf.decode_csv:是一个Splite功能的OP,可以获取每一行的特定参数。
    • 通过OP获取数据,在Session中需要tf.train.Coordinator()和tf.train.start_queue_runners(sess=sess,coord=coord)。

读取MaxCompute数据

您可以直接使用PAI-Studio的Tensorflow组件读写MaxCompute数据。

下文以iris数据集为例,为您介绍如何读取MaxCompute数据。
主流程 描述
连接组件 拖动组件并连接。reag
配置读数据表组件 在读数据表组件中,输入如下代码。
pai_online_project.iris_data
即可获取数据。test
数据格式如下。test
配置Tensorflow组件 test
  • ①输入桩连接OSS输入
  • ②输入桩连接MaxCompute输入
  • ③输入桩连接模型输入
  • ④输出桩连接模型输出
  • ⑤输出桩连接MaxCompute输出
如果输入MaxCompute表,输出也是MaxCompute表,则只需要接入②和⑤。
读写MaxCompute表需要配置数据源、代码文件、输出模型路径、建表等操作。teswer
  • Python代码文件:将执行代码存放至OSS路径下。
    注意 OSS需要与当前项目在同一区域
  • Checkpoint输出目录/模型输入目录:选择您自己的OSS路径用来存放模型。
  • MaxCompute输出表:写MaxCompute表要求输出表是已经存在的表,并且输出的表名需要与代码中的输出表名一致。在本案例中需要填写iris_output
  • 建表SQL语句:如果代码中的输出表并不存在,可以通过这个输入框输入建表语句自动建表。本案例中建表语句create table iris_output(f1 DOUBLE,f2 DOUBLE,f3 DOUBLE,f4 DOUBLE,f5 STRING);
组件API命令
PAI -name tensorflow180_ext -project algo_public -Doutputs="odps://${当前项目名}/tables/${输出表名}" -DossHost="${OSS的host}" -Dtables="odps://${当前项目名}/tables/${输入表名}" -DgpuRequired="${GPU卡数}" -Darn="${OSS访问RoleARN}" -Dscript="${执行的代码文件}";
其中,${}类型的参数需要替换成您的真实数据。
代码说明
import tensorflow as tf
tf.app.flags.DEFINE_string("tables", "", "tables info")
FLAGS = tf.app.flags.FLAGS
print("tables:" + FLAGS.tables)
tables = [FLAGS.tables]
filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
reader = tf.TableRecordReader()
key, value = reader.read(filename_queue)
record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults)
# line 9 and 10 can be written like below for short. It will be helpful when too many columns exist.
# record_defaults = [[1.0]] * 4 + [["Iris-virginica"]]
# value_list = tf.decode_csv(value, record_defaults = record_defaults)
writer = tf.TableRecordWriter("odps://{MaxCompute项目名}/tables/iris_output")
write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
# line 16 can be written like below for short. It will be helpful when too many columns exist.
# write_to_table = writer.write(range(5), value_list)
close_table = writer.close()
init = tf.global_variables_initializer()
with tf.Session() as sess:
    sess.run(init)
    sess.run(tf.local_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    try:
       step = 0
       while not coord.should_stop():
            step += 1
            sess.run(write_to_table)
    except tf.errors.OutOfRangeError:
        print('%d records copied' % step)
    finally:
        sess.run(close_table)
        coord.request_stop()
        coord.join(threads)
读数据表
tables = [FLAGS.tables]
filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
reader = tf.TableRecordReader()
key, value = reader.read(filename_queue)
record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]
其中,FLAGS.tables是前端配置的输入表名的传参变量,对应组件的MaxCompute输入桩②。当数据量较大时,尽量采用批量操作,减少op的执行次数。
写数据表
writer = tf.TableRecordWriter("odps://{MaxCompute项目名}/tables/iris_output")
write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
其中:
  • TableRecordWriter中的格式为odps://当前项目名/tables/输出表名
  • 读写分区表格式为odps://当前项目名/tables/输出表名/pt=1