MaxCompute使用

PAI子产品(DLCDSW)中,您可以通过阿里云MaxCompute提供的PyODPS或人工智能平台PAI自主研发的paiio,实现MaxCompute数据的读写操作。针对不同的应用场景,您可以选择合适的MaxCompute数据读取方式。

功能介绍

  • PyODPS

    PyODPS是阿里云MaxComputePython版本的SDK,提供简单方便的Python编程接口。您可以使用PyODPS完成上传和下载文件、创建表、运行ODPS SQL查询等操作。详情请参见PyODPS概述

  • paiio

    为了在PAI子产品中方便地读写MaxCompute表数据,PAI团队自主研发了paiio模块。paiio支持以下三种接口:

    接口

    区别

    功能描述

    TableRecordDataset

    依赖TensorFlow框架,推荐在1.2及以上版本中使用Dataset接口(详情请参见Dataset),替代原有的线程和队列接口构建数据流。

    读取MaxCompute表数据。

    TableReader

    不依赖TensorFlow框架,基于MaxCompute SDK实现,可以直接访问MaxCompute表并即时获取I/O结果。

    读取MaxCompute表数据。

    TableWriter

    不依赖TensorFlow框架,基于MaxCompute SDK实现,可以直接对MaxCompute表进行写入并返回。

    MaxCompute表中写入数据。

前提条件

使用限制

paiio模块不支持使用自定义镜像,仅选择TensorFlow 1.12、1.152.0版本对应的镜像时,才可使用paiio模块。

PyODPS

您可以使用PyODPS实现MaxCompute数据的读写操作。

  1. 执行如下命令,安装PyODPS。

    pip install pyodps
  2. 执行如下命令检查安装是否成功。若无返回值和报错信息表示安装成功。

    python -c "from odps import ODPS"
  3. 如果您使用的Python不是系统默认的Python版本,安装完PIP后,您可以执行如下命令进行Python版本切换。

    /home/tops/bin/python3.7 -m pip install setuptools>=3.0
    #/home/tops/bin/python3.7为安装的python路径
  4. 通过PyODPS读写MaxCompute数据。

    import numpy as np
    import pandas as pd
    import os
    
    from odps import ODPS
    from odps.df import DataFrame
    # 建立链接。
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定字段名以及字段类型的方式创建非分区表my_new_table。
    
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分区表my_new_table中插入数据。
    records = [[111, 'aaa'],
              [222, 'bbb'],
              [333, 'ccc'],
              [444, '中文']]
    o.write_table(table, records)
    
    # 读取数据。
    sql = '''
    SELECT  
        *
    FROM
        your-default-project.<table>
    LIMIT 100
    ;
    '''
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=1) # n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
    

    其中:

    • ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为您的阿里云账号的AccessKey ID和 AccessKey Secret。

      说明

      不建议直接使用AccessKey IDAccessKey Secret字符串。

    • your-default-projectyour-end-point:需替换为您设置的默认项目名称与Endpoint信息,各地域的Endpoint请参见Endpoint

    更多关于如何使用PyODPSMaxCompute表进行其他操作,详情请参见

paiio

准备工作:配置账户信息

使用paiio模块读写MaxCompute表数据之前,需要配置MaxCompute账户的AccessKey信息。PAI支持从配置文件读取配置信息,您可以将配置文件放置在挂载的文件系统中,然后在代码中通过环境变量引用。

  1. 编写配置文件,内容如下。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    参数

    描述

    access_id

    阿里云账号的AccessKey ID。

    access_key

    阿里云账号的AccessKey Secret。

    end_point

    MaxComputeEndpoint,例如华东2(上海)配置为http://service.cn-shanghai.maxcompute.aliyun.com/api。详情请参见Endpoint

  2. 在代码中指定配置文件路径,方式如下。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    其中<your MaxCompute config file path>表示配置文件的路径。

TableRecordDataset使用说明

接口说明

TensorFlow社区推荐在1.2及以上版本中使用Dataset接口(详情请参见Dataset)替代原有的线程和队列接口构建数据流。通过多个Dataset接口的组合变换生成计算数据,可以简化数据输入部分的代码。

  • 接口定义(Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • 参数

    参数

    是否必选

    类型

    默认值

    描述

    filenames

    STRING

    待读取的表名集合(列表),多张表的Schema必须一致。表名格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    LISTTUPLE

    用于读取列的数据类型转换及列为空时的默认值。如果该值与实际读取的列数不符,或数据类型无法自动转换,则执行过程中系统会抛出异常。

    系统支持的数据类型包括FLOAT32、FLOAT64、INT32、INT64、BOOLSTRING,INT64类型的默认值请参见np.array(0, np.int64)

    selected_cols

    STRING

    None

    选取的列,格式为半角逗号(,)分隔的字符串。默认值None表示读取所有列。该参数与excluded_cols不能同时使用。

    excluded_cols

    STRING

    None

    排除的列,格式为半角逗号(,)分隔的字符串。默认值None表示读取所有列。该参数与selected_cols不能同时使用。

    slice_id

    INT

    0

    在分布式读取场景下,当前分片的编号(从0开始编号)。分布式读取时,系统根据slice_count将表均分为多个分片,读取slice_id对应的分片。

    slice_id为默认值0时,如果slice_count取值为1,则表示读取整张表。如果slice_count大于1,则表示读取第0个分片。

    slice_count

    INT

    1

    在分布式读取场景下,总的分片数量,通常为Worker数量。默认值1表示不分片,即读取整张表,

    num_threads

    INT

    0

    预取数据时,每个访问表的内置Reader启用的线程(独立于计算线程)数量。取值范围为1~64。如果num_threads取值为0,则系统自动将新建的预取线程数配置为计算线程池线程数的1/4。

    说明

    因为I/O对每个模型的整体计算影响不同,所以提高预取线程数,不一定可以提升整体模型的训练速度。

    capacity

    INT

    0

    读取表的总预取量,单位为行数。如果num_threads大于1,则每个线程的预取量为capacity/num_threads行(向上取整)。如果capacity0,则内置Reader根据所读表的前N行(系统默认N=256)平均值自动配置总预取量,使得每个线程的预取数据约占空间64 MB。

    说明

    如果MaxCompute表字段为DOUBLE类型,则TensorFlow中需要使用np.float64格式与其对应。

  • 返回值

    返回Dataset对象,可以作为Pipeline工作流构建的输入。

使用示例

假设在myproject项目中存储了一张名为test的表,其部分内容如下所示。

itemid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代码实现了使用TableRecordDataset接口读取testitemidprice列的数据。

import os
import tensorflow as tf
import paiio

# 指定配置文件路径。请替换为配置文件实际存放的路径。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定义要读取的Table, 可以是多个。请替换为需要访问的表名称和相应的MaxCompute项目名称。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定义TableRecordDataset, 读取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# 设置epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader使用说明

接口说明

TableReader基于MaxCompute SDK实现,不依赖TensorFlow框架,可以直接访问MaxCompute表并即时获取I/O结果。

  • 创建Reader并打开表

    • 接口定义

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • 参数

    • 参数

      是否必选

      类型

      默认值

      描述

      table

      STRING

      需要打开的MaxCompute表名,格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      STRING

      空字符串("")

      选取的列,格式为英文逗号(,)分隔的字符串。默认值空字符串("")表示读取所有列。该参数与excluded_cols不能同时使用。

      excluded_cols

      STRING

      空字符串("")

      排除的列,格式为英文逗号(,)分隔的字符串。默认值空字符串("")表示读取所有列。该参数与selected_cols不能同时使用。

      slice_id

      INT

      0

      在分布式读取场景下,当前分片的编号,取值范围为[0, slice_count-1]。分布式读取时,系统根据slice_count将表均分为多个分片,读取slice_id对应的分片。默认值0表示不分片,即读取表的所有行。

      slice_count

      INT

      1

      在分布式读取场景下,总的分片数量,通常为Worker数量。

    • 返回值

      Reader对象。

  • 读取记录

    • 接口定义

    • reader.read(num_records=1)
    • 参数

      num_records表示顺序读取的行数。默认值为1,即读取1行。如果num_records参数超出未读的行数,则返回读取到的所有行。如果未读取到记录,则抛出OutOfRange异常(paiio.python_io.OutOfRangeException)。

    • 返回值

      返回一个numpy ndarray数组(或称为recarray),数组中每个元素为表的一行数据组成的一个TUPLE。

  • 定位到相应行

    • 接口定义

    • reader.seek(offset=0)
    • 参数

    • offset表示定位到的行(行从0开始编号),下一个Read操作将从定位的行开始。如果配置了slice_idslice_count,则按分片位置进行相对行的定位。如果offset超出表的总行数,则系统抛出OutOfRange异常。如果之前的读取位置已经超出表尾,则继续进行seek系统会抛出OutOfRange异常(paiio.python_io.OutOfRangeException)。

      重要

      读取一个batch_size时,如果剩余行数不足一个batch_size,则read操作会返回剩余行且不抛异常。此时,如果继续进行seek操作,则系统会抛异常。

    • 返回值

      无返回值。如果操作出错,则系统抛出异常。

  • 获取表的总记录数

    • 接口定义

    • reader.get_row_count()
    • 参数

    • 返回值

      返回表的行数。如果配置了slice_idslice_count,则返回分片大小。

  • 获取表的Schema

    • 接口定义

    • reader.get_schema()
    • 参数

    • 返回值

    • 返回1D-stuctured ndarray,每个元素对应reader中选定的MaxCompute表中一列的Schema,包括如下三个元素。

      参数

      描述

      colname

      列名。

      typestr

      MaxCompute数据类型名称。

      pytype

      typestr对应的Python数据类型。

      typestrpytype的对应关系如下表所示。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      说明

      PAI-TensorFlow不支持对MAP类型数据进行操作。

      OBJECT

  • 关闭表

    • 接口定义

    • reader.close()
    • 参数

    • 返回值

      无返回值。如果操作出错,则系统抛出异常。

使用示例

假设在myproject项目中存储了一张名为test的表,其内容如下所示。

uid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代码实现了使用TableReader读取uidnameprice列的数据。

    import os
    import paiio
    
    # 指定配置文件路径。请替换为配置文件实际存放的路径。
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # 打开一张表,返回reader对象。请替换为需要访问的表名称和相应的MaxCompute项目名称。
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # 获得表的总行数。
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # 读表,返回值将是一个recarray数组,形式为[(uid, name, price)*2]。
    records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
    # 继续读取将抛出OutOfRange异常。
    
    # Close the reader.
    reader.close()

TableWriter使用说明

TableWriter基于MaxCompute SDK实现,不依赖TensorFlow框架,可以直接对MaxCompute表进行写入并返回。

接口说明

  • 创建Writer并打开表

    • 接口定义

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      说明
      • 该接口不会清空原表中的数据,采用追加的方式写入数据。

      • 对于新写入的数据,关闭表之后才能对其进行读取。

    • 参数

      参数

      是否必选

      类型

      默认值

      描述

      table

      STRING

      待打开的MaxCompute表名,格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      INT

      0

      在分布式场景,写表至不同的分片,从而避免写冲突。在单机场景,使用默认值0即可。在多机场景,如果多个Worker(包括PS)同时使用同一个slice_id写表,则会导致写入失败。

    • 返回值

      返回Writer对象。

  • 写入记录

    • 接口定义

      writer.write(values, indices)
    • 参数

      参数

      是否必选

      类型

      默认值

      描述

      values

      STRING

      待写入的数据。支持写入单行数据或多行数据:

      • 如果仅写入单行数据,则向values参数传入一个由标量组成的TUPLE、LIST1D-ndarray。如果传入的是LISTndarray,则说明写入的各列数据类型一致。

      • 如果写入N行数据(N>=1),可以向values参数传入一个LIST1D-ndarray,参数中的每个元素都应该对应一个单行的数据(用TUPLELIST表示,也可以通过Structure形式存放于ndarray中)。

      indices

      INT

      指定数据写入的列,支持传入由INT类型Index组成的TUPLE、LIST1D-ndarray。indices中每个数(i)对应表中相应的第i列(列数从0开始编号)。

    • 返回值

      无返回值。如果写过程出错,则系统会抛出异常并退出。

  • 关闭表

    • 接口定义

      writer.close()
      说明

      with语句的区块中,无需显示调用close()接口关闭表。

    • 参数

    • 返回值

      无返回值。如果操作出错,则系统抛出异常。

    • 示例

      通过with语句使用TableWriter,代码如下。

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

使用示例

import paiio
import os

# 指定配置文件路径。请替换为配置文件实际存放的路径。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 准备数据。
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# 打开一个表,返回writer对象。请替换为需要访问的表名称和相应的MaxCompute项目名称。
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to the 0-3 columns of the table. 将数据写至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])

# 关闭writer。
writer.close()