全部产品
云市场

Tensorflow读写MaxCompute表

更新时间:2019-10-11 20:19:37

PAI-STUDIO通过Tensorflow处理MaxCompute表数据

PAI-STUDIO在支持OSS数据源的基础上,增加了对MaxCompute表的数据支持。用户可以直接使用PAI-STUDIO的Tensorflow组件读写MaxCompute数据,本教程将提供完整数据和代码供大家测试。

在开始实验之前,请先确保已开通了OSS存储服务用来存放训练代码。主账号如何开通和授权OSS,请参考:https://help.aliyun.com/document_detail/49571.html#h2-oss-4。

详细流程

为了方便用户快速上手,本文档将以训练iris数据集为例,介绍如何跑通实验。

1.读数据表组件

为了方便大家,我们提供了一份公共读的数据供大家测试,只要拖出读数据表组件,输入:

  1. pai_online_project.iris_data

即可获取数据,

数据格式如图:

2.Tensorflow组件说明

3个输入桩从左到右分别是OSS输入、MaxCompute输入、模型输入。2个输出桩分别是模型输出、MaxCompute输出。如果输入是一个MaxCompute表,输出也是一个MaxCompute表,需要按下图方法连接。

读写MaxCompute表需要配置数据源、代码文件、输出模型路径、建表等操作。

  • Python代码文件:需要把执行代码放到OSS路径下(注意OSS需要与当前项目在同一区域),本文提供的代码可以在下方连接下载(代码需要按照下方代码说明文案调整):http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/129749/cn_zh/1565333220966/iristest.py?spm=a2c4g.11186623.2.10.50c46b36PlNwcq&file=iristest.py
  • Checkpoint输出目录/模型输入目录:选择自己的OSS路径用来存放模型
  • MaxCompute输出表:写MaxCompute表要求输出表是已经存在的表,并且输出的表名需要跟代码中的输出表名一致。在本案例中需要填写“iris_output”(不需要填写odps://xxx,仅需填写表名称)
  • 建表SQL语句:如果代码中的输出表并不存在,可以通过这个输入框输入建表语句自动建表。本案例中建表语句“create table iris_output(f1 DOUBLE,f2 DOUBLE,f3 DOUBLE,f4 DOUBLE,f5 STRING);”

组件PAI命令

  1. PAI -name tensorflow180_ext -project algo_public -Doutputs="odps://${当前项目名}/tables/${输出表名}" -DossHost="${OSS的host}" -Dtables="odps://${当前项目名}/tables/${输入表名}" -DgpuRequired="${GPU卡数}" -Darn="${OSS访问RoleARN}" -Dscript="${执行的代码文件}";

上述命令中的${}需要替换成用户真实数据

3.代码说明

  1. import tensorflow as tf
  2. tf.app.flags.DEFINE_string("tables", "", "tables info")
  3. FLAGS = tf.app.flags.FLAGS
  4. print("tables:" + FLAGS.tables)
  5. tables = [FLAGS.tables]
  6. filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
  7. reader = tf.TableRecordReader()
  8. key, value = reader.read(filename_queue)
  9. record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]
  10. col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults)
  11. # line 9 and 10 can be written like below for short. It will be helpful when too many columns exist.
  12. # record_defaults = [[1.0]] * 4 + [["Iris-virginica"]]
  13. # value_list = tf.decode_csv(value, record_defaults = record_defaults)
  14. writer = tf.TableRecordWriter("odps://pai_bj_test2/tables/iris_output")
  15. write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
  16. # line 16 can be written like below for short. It will be helpful when too many columns exist.
  17. # write_to_table = writer.write(range(5), value_list)
  18. close_table = writer.close()
  19. init = tf.global_variables_initializer()
  20. with tf.Session() as sess:
  21. sess.run(init)
  22. sess.run(tf.local_variables_initializer())
  23. coord = tf.train.Coordinator()
  24. threads = tf.train.start_queue_runners(coord=coord)
  25. try:
  26. step = 0
  27. while not coord.should_stop():
  28. step += 1
  29. sess.run(write_to_table)
  30. except tf.errors.OutOfRangeError:
  31. print('%d records copied' % step)
  32. finally:
  33. sess.run(close_table)
  34. coord.request_stop()
  35. coord.join(threads)

读数据表

  1. tables = [FLAGS.tables]
  2. filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
  3. reader = tf.TableRecordReader()
  4. key, value = reader.read(filename_queue)
  5. record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]

其中FLAGS.tables是前端配置的输入表名的传参变量,对应组件的MaxCompute输入桩:

在数据量较大时,尽量采用批量操作,减少op的执行次数。

写数据表

  1. writer = tf.TableRecordWriter("odps://pai_bj_test2/tables/iris_output")
  2. write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
  • TableRecordWriter中的格式为”odps://当前项目名/tables/输出表名”
  • 读写分区表写法:”odps://当前项目名/tables/输出表名/pt=1”