为了在DLC任务中方便地读写MaxCompute表数据,PAI团队开发了paiio模块。paiio支持TableRecordDataset、TableReader及TableWriter三种接口,本文详细介绍这三种接口的使用说明及读写MaxCompute表数据的使用示例。
使用限制
-
paiio模块支持TensorFlow 1.12、1.15和2.0版本。仅在分布式训练(DLC)任务中选择这些版本对应的镜像时,才可使用paiio模块。
-
paiio模块不支持自定义镜像。
准备工作:配置账户信息
使用paiio模块读写MaxCompute表数据之前,需要配置MaxCompute账户的AccessKey信息。PAI支持从配置文件读取配置信息,您可以将配置文件放置在挂载的文件系统中,然后在代码中通过环境变量引用。
-
编写配置文件,内容如下。
access_id=xxxx access_key=xxxx end_point=http://xxxx参数
描述
access_id
阿里云账号的AccessKey ID。
access_key
阿里云账号的AccessKey Secret。
end_point
MaxCompute的Endpoint,例如华东2(上海)配置为
http://service.cn-shanghai.maxcompute.aliyun.com/api。详情请参见Endpoint。 -
在代码中指定配置文件路径,方式如下。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'其中<your MaxCompute config file path>表示配置文件的路径。
TableRecordDataset使用说明
接口说明
TensorFlow社区推荐在1.2及以上版本中使用Dataset接口(详情请参见Dataset)替代原有的线程和队列接口构建数据流。通过多个Dataset接口的组合变换生成计算数据,可以简化数据输入部分的代码。
-
接口定义(Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0): -
参数
参数
是否必选
类型
默认值
描述
filenames
是
STRING
无
待读取的表名集合(列表),多张表的Schema必须一致。表名格式为
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...。record_defaults
是
LIST或TUPLE
无
用于读出列的数据类型转换及列为空时的默认值。如果该值与实际读出的列数不符,或数据类型无法自动转换,则执行过程中系统会抛出异常。
系统支持的数据类型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING,INT64类型的默认值请参见
np.array(0, np.int64)。selected_cols
否
STRING
None
选取的列,格式为半角逗号(,)分隔的字符串。默认值None表示读取所有列。该参数与excluded_cols不能同时使用。
excluded_cols
否
STRING
None
排除的列,格式为半角逗号(,)分隔的字符串。默认值None表示读取所有列。该参数与selected_cols不能同时使用。
slice_id
否
INT
0
在分布式读取场景下,当前分片的编号(从0开始编号)。分布式读取时,系统根据slice_count将表均分为多个分片,读取slice_id对应的分片。
slice_id为默认值0时,如果slice_count取值为1,则表示读取整张表。如果slice_count大于1,则表示读取第0个分片。
slice_count
否
INT
1
在分布式读取场景下,总的分片数量,通常为Worker数量。默认值1表示不分片,即读取整张表,
num_threads
否
INT
0
预取数据时,每个访问表的内置Reader启用的线程(独立于计算线程)数量。取值范围为1~64。如果num_threads取值为0,则系统自动将新建的预取线程数配置为计算线程池线程数的1/4。
说明因为I/O对每个模型的整体计算影响不同,所以提高预取线程数,不一定可以提升整体模型的训练速度。
capacity
否
INT
0
读取表的总预取量,单位为行数。如果num_threads大于1,则每个线程的预取量为capacity/num_threads行(向上取整)。如果capacity为0,则内置Reader根据所读表的前N行(系统默认N=256)平均值自动配置总预取量,使得每个线程的预取数据约占空间64 MB。
说明如果MaxCompute表字段为DOUBLE类型,则TensorFlow中需要使用np.float64格式与其对应。
-
返回值
返回Dataset对象,可以作为Pipeline工作流构建的输入。
使用示例
假设在myproject项目中存储了一张名为test的表,其部分内容如下所示。
|
itemid(BIGINT) |
name(STRING) |
price(DOUBLE) |
virtual(BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
以下代码实现了使用TableRecordDataset接口读取test表itemid和price列的数据。
import os
import tensorflow as tf
import paiio
# 指定配置文件路径。请替换为配置文件实际存放的路径。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定义要读取的Table, 可以是多个。请替换为需要访问的表名称和相应的MaxCompute项目名称。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定义TableRecordDataset, 读取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# 设置epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader使用说明
接口说明
TableReader基于MaxCompute SDK实现,不依赖TensorFlow框架,可以直接访问MaxCompute表并即时获取I/O结果。
-
创建Reader并打开表
-
接口定义
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1): -
-
参数
-
返回值
Reader对象。
|
参数 |
是否必选 |
类型 |
默认值 |
描述 |
|
table |
是 |
STRING |
无 |
需要打开的MaxCompute表名,格式为 |
|
selected_cols |
否 |
STRING |
空字符串("") |
选取的列,格式为英文逗号(,)分隔的字符串。默认值空字符串("")表示读取所有列。该参数与excluded_cols不能同时使用。 |
|
excluded_cols |
否 |
STRING |
空字符串("") |
排除的列,格式为英文逗号(,)分隔的字符串。默认值空字符串("")表示读取所有列。该参数与selected_cols不能同时使用。 |
|
slice_id |
否 |
INT |
0 |
在分布式读取场景下,当前分片的编号,取值范围为[0, slice_count-1]。分布式读取时,系统根据slice_count将表均分为多个分片,读取slice_id对应的分片。默认值0表示不分片,即读取表的所有行。 |
|
slice_count |
否 |
INT |
1 |
在分布式读取场景下,总的分片数量,通常为Worker数量。 |
读取记录
-
接口定义
reader.read(num_records=1)
参数
num_records表示顺序读取的行数。默认值为1,即读取1行。如果num_records参数超出未读的行数,则返回读取到的所有行。如果未读取到记录,则抛出OutOfRange异常(paiio.python_io.OutOfRangeException)。
返回值
返回一个numpy ndarray数组(或称为recarray),数组中每个元素为表的一行数据组成的一个TUPLE。
定位到相应行
-
接口定义
reader.seek(offset=0)
参数
offset表示定位到的行(行从0开始编号),下一个Read操作将从定位的行开始。如果配置了slice_id和slice_count,则按分片位置进行相对行的定位。如果offset超出表的总行数,则系统抛出OutOfRange异常。如果之前的读取位置已经超出表尾,则继续进行seek系统会抛出OutOfRange异常(paiio.python_io.OutOfRangeException)。
读取一个batch_size时,如果剩余行数不足一个batch_size,则read操作会返回剩余行且不抛异常。此时,如果继续进行seek操作,则系统会抛异常。
返回值
无返回值。如果操作出错,则系统抛出异常。
获取表的总记录数
-
接口定义
reader.get_row_count()
参数
无
返回值
返回表的行数。如果配置了slice_id和slice_count,则返回分片大小。
获取表的Schema
-
接口定义
reader.get_schema()
参数
无
返回值
返回1D-stuctured ndarray,每个元素对应reader中选定的MaxCompute表中一列的Schema,包括如下三个元素。
|
参数 |
描述 |
|
colname |
列名。 |
|
typestr |
MaxCompute数据类型名称。 |
|
pytype |
typestr对应的Python数据类型。 |
typestr和pytype的对应关系如下表所示。
|
typestr |
pytype |
|
BIGINT |
INT |
|
DOUBLE |
FLOAT |
|
BOOLEAN |
BOOL |
|
STRING |
OBJECT |
|
DATETIME |
INT |
|
MAP 说明
PAI-TensorFlow不支持对MAP类型数据进行操作。 |
OBJECT |
关闭表
-
接口定义
reader.close()
参数
无
返回值
无返回值。如果操作出错,则系统抛出异常。
使用示例
假设在myproject项目中存储了一张名为test的表,其内容如下所示。
|
uid(BIGINT) |
name(STRING) |
price(DOUBLE) |
virtual(BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
以下代码实现了使用TableReader读取uid、name及price列的数据。
import os
import paiio
# 指定配置文件路径。请替换为配置文件实际存放的路径。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 打开一张表,返回reader对象。请替换为需要访问的表名称和相应的MaxCompute项目名称。
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# 获得表的总行数。
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# 读表,返回值将是一个recarray数组,形式为[(uid, name, price)*2]。
records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
# 继续读取将抛出OutOfRange异常。
# Close the reader.
reader.close()
TableWriter使用说明
TableWriter基于MaxCompute SDK实现,不依赖TensorFlow框架,可以直接对MaxCompute表进行写入并返回。
接口说明
-
创建Writer并打开表
-
接口定义
writer = paiio.python_io.TableWriter(table, slice_id=0)说明-
该接口不会清空原表中的数据,采用追加的方式写入数据。
-
对于新写入的数据,关闭表之后才能对其进行读取。
-
-
参数
参数
是否必选
类型
默认值
描述
table
是
STRING
无
待打开的MaxCompute表名,格式为
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...slice_id
否
INT
0
在分布式场景,写表至不同的分片,从而避免写冲突。在单机场景,使用默认值0即可。在多机场景,如果多个Worker(包括PS)同时使用同一个slice_id写表,则会导致写入失败。
-
返回值
返回Writer对象。
-
-
写入记录
-
接口定义
writer.write(values, indices) -
参数
参数
是否必选
类型
默认值
描述
values
是
STRING
无
待写入的数据。支持写入单行数据或多行数据:
-
如果仅写入单行数据,则向values参数传入一个由标量组成的TUPLE、LIST或1D-ndarray。如果传入的是LIST或ndarray,则说明写入的各列数据类型一致。
-
如果写入N行数据(N>=1),可以向values参数传入一个LIST或1D-ndarray,参数中的每个元素都应该对应一个单行的数据(用TUPLE或LIST表示,也可以通过Structure形式存放于ndarray中)。
indices
是
INT
无
指定数据写入的列,支持传入由INT类型Index组成的TUPLE、LIST或1D-ndarray。indices中每个数(i)对应表中相应的第i列(列数从0开始编号)。
-
-
返回值
无返回值。如果写过程出错,则系统会抛出异常并退出。
-
-
关闭表
-
接口定义
writer.close()说明在with语句的区块中,无需显示调用close()接口关闭表。
-
参数
无
-
返回值
无返回值。如果操作出错,则系统抛出异常。
-
示例
通过with语句使用TableWriter,代码如下。
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, indices) # Table would be closed automatically outside this section.
-
使用示例
import paiio
import os
# 指定配置文件路径。请替换为配置文件实际存放的路径。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 准备数据。
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# 打开一个表,返回writer对象。请替换为需要访问的表名称和相应的MaxCompute项目名称。
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write records to the 0-3 columns of the table. 将数据写至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])
# 关闭writer。
writer.close()
后续操作
代码配置完成后,您可以参照以下步骤使用paiio进行MaxCompute表的读写操作:
-
创建数据集,上传您已准备的配置和代码文件至数据源。如何创建数据集,请参见创建及管理数据集。
-
创建分布式训练(DLC)任务,其中关键参数配置说明如下,其他参数配置说明,请参见创建训练任务。
-
节点镜像:PAI官方镜像选择TensorFlow 1.12、TensorFlow 1.15或TensorFlow 2.0版本对应的镜像。
-
数据集配置:数据集选择步骤1创建的数据集;挂载路径配置为
/mnt/data/。 -
执行命令:配置为
python /mnt/data/xxx.py。其中xxx.py即为步骤1上传的代码文件。
-
-
单击确定。
成功提交训练任务后,您可以在实例日志中查看运行结果,详情请参见查看任务日志。