PyODPS支持对MaxCompute表的基本操作,包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何将表转换为DataFrame对象。

背景信息

PyODPS提供对MaxCompute表的基本操作方法。

操作

说明

基本操作

列出项目空间下的所有表、判断表是否存在、获取表等基本操作。

创建表的Schema

使用PyODPS创建表的Schema。

创建表

使用PyODPS创建表。

同步表更新

使用PyODPS同步表更新。

表记录类型(Record)

使用PyODPS读取/写入的记录数据结构。

写入表数据

使用PyODPS向表中写入数据。

获取表数据

使用PyODPS获取表中数据。

删除表

使用PyODPS删除表。

转换表为DataFrame

使用PyODPS转换表为DataFrame。

表分区

使用PyODPS判断是否为分区表、遍历表全部分区、判断分区是否存在、创建分区等。

数据上传下载通道

使用PyODPS操作TunnelMaxCompute中上传或者下载数据。

说明

更多PyODPS方法说明,请参见Python SDK方法说明

前提条件:准备运行环境

PyODPS支持在DataWorks的PyODPS节点或本地PC环境中运行,运行前您需先选择运行工具并准备好运行环境。
  • 使用DataWorks:创建好PyODPS 2节点或PyODPS 3节点,详情请参见通过DataWorks使用PyODPS
  • 使用本地PC环境:安装好PyODPS并初始化ODPS入口对象。

基本操作

当前项目内的表操作

  • 列出项目空间下的所有表:

    o.list_tables()方法可以列出项目空间下的所有表。

    for table in o.list_tables():
        print(table)

    可以通过prefix参数只列举给定前缀的表:

    for table in o.list_tables(prefix="table_prefix"):
        print(table.name)

    通过该方法获取的 Table 对象不会自动加载表名以外的属性,此时获取这些属性(例如table_schema或者creation_time)可能导致额外的请求并造成额外的时间开销。如果需要在列举表的同时读取这些属性,在 PyODPS 0.11.5 及后续版本中,可以为list_tables添加extended=True参数:

    for table in o.list_tables(extended=True):
        print(table.name, table.creation_time)

    如果需要按类型列举表,可以指定type参数。不同类型的表列举方法如下:

    managed_tables = list(o.list_tables(type="managed_table"))  # 列举内置表
    external_tables = list(o.list_tables(type="external_table"))  # 列举外表
    virtual_views = list(o.list_tables(type="virtual_view"))  # 列举视图
    materialized_views = list(o.list_tables(type="materialized_view"))  # 列举物化视图
  • 判断表是否存在:

    o.exist_table()方法可以判断表是否存在。

    print(o.exist_table('pyodps_iris'))
    # 返回True表示表pyodps_iris存在。
  • 获取表:

    入口对象的o.get_table()方法可以获取表。

    • 获取表的schema信息。

      t = o.get_table('pyodps_iris')
      print(t.schema)  # 获取表pyodps_irisschema

      返回值示例如下。

      odps.Schema {
        sepallength           double      # 片长度(cm)
        sepalwidth            double      # 片宽度(cm)
        petallength           double      # 瓣长度(cm)
        petalwidth            double      # 瓣宽度(cm)
        name                  string      # 种类
      }
    • 获取表列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema.columns)  # 获取表pyodps_irisschema中的列信息

      返回值示例如下。

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • 获取表的某个列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'])  # 获取表pyodps_irissepallength列信息

      返回值示例如下。

      <column sepallength, type double>
    • 获取表的某个列的备注信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment)  # 获取表pyodps_irissepallength列的备注信息

      返回示例如下。

      片长度(cm)
    • 获取表的生命周期。

      t = o.get_table('pyodps_iris')
      print(t.lifecycle)  # 获取表pyodps_iris的生命周期

      返回值示例如下。

      -1
    • 获取表的创建时间。

      t = o.get_table('pyodps_iris')
      print(t.creation_time)  # 获取表pyodps_iris的创建时间
    • 获取表是否是虚拟视图。

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view)  # 获取表pyodps_iris是否是虚拟视图,返回False,表示不是。

    与上述示例类似,您也可以通过t.sizet.comment来获取表的大小、表备注等信息。

    跨项目的表操作

    您可以通过project参数,跨项目获取表。

    t = o.get_table('table_name', project='other_project')

    其中other_project为所跨的项目,table_name为跨项目获取的表名称。

创建表的Schema

初始化方法有如下两种:

  • 通过表的列以及可选的分区进行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化后,您可获取字段信息、分区信息等。

    • 获取所有字段信息。

      print(schema.columns)

      返回示例如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 获取分区字段。

      print(schema.partitions)

      返回示例如下。

      [<partition pt, type string>]
    • 获取非分区字段名称。

      print(schema.names)

      返回示例如下。

      ['num', 'num2']
    • 获取非分区字段类型。

      print(schema.types)

      返回示例如下。

      [bigint, double]
  • 使用Schema.from_lists()方法。该方法更容易调用,但无法直接设置列和分区的注释。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    返回值示例如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

您可以使用o.create_table()方法创建表,使用方式有两种:使用表Schema方式、使用字段名和字段类型方式。同时创建表时表字段的数据类型有一定的限制条件,详情如下。

使用表Schema创建表

使用表Schema创建表时,您需要先创建表的Schema,然后通过Schema创建表。

#创建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通过schema创建表
table = o.create_table('my_new_table', schema)

#只有不存在表时,才创建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)

#设置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表

#创建分区表my_new_table,可传入(表字段列表,分区字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

#创建非分区表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表:新数据类型

未打开新数据类型开关时(默认关闭),创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAPARRAY类型。如果您需要创建TINYINTSTRUCT等新数据类型字段的表,可以打开options.sql.use_odps2_extension = True开关,示例如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

同步表更新

当一个表被其他程序更新,例如改变了Schema,可以调用reload()方法同步表的更新。

#表schema变更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通过reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()

表记录类型(Record)

Record类型表示表的一行记录,为Table.open_reader或者Table.open_writer接口在读写时所使用的数据结构,也用于Tunnel接口TableDownloadSession.open_record_reader或者TableUploadSession.open_record_writer 。在Table对象上调用new_record方法即可创建一个新的Record实例。

下述示例中,假定表结构为:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

该表对应的Record相关操作实例为:

import datetime
t = o.get_table('mytable')  # o 为 MaxCompute 入口对象
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])  # 值的个数必须等于表schema的字段数
r2 = t.new_record()  # 初始化时也可以不传入值
r2[0] = 1024  # 可以通过偏移设置值
r2['c_string_a'] = 'val1'  # 也可以通过字段名设置值
r2.c_string_a = 'val1'  # 通过属性设置值
r2.c_array_a = ['val1', 'val2']  # 设置 array 类型的值
r2.c_map_a = {1: 'val1'}  # 设置 map 类型的值
r2.c_struct_a = (1, 'val1')  # 使用 tuple 设置 struct 类型的值,当 PyODPS >= 0.11.5
r2.c_struct_a = {"a": 1, "b": 'val1'}  # 也可以使用 dict 设置 struct 类型的值

print(r[0])  # 取第0个位置的值
print(r['c_string_a'])  # 通过字段取值
print(r.c_string_a)  # 通过属性取值
print(r[0: 3])  # 切片操作
print(r[0, 2, 3])  # 取多个位置的值
print(r['c_int_a', 'c_double_a'])  # 通过多个字段取值

MaxCompute数据类型与Python类型在Record中的对应关系如下:

MaxCompute类型

Python类型

说明

TINYINT、SMALLINT、INT、BIGINT

int

FLOAT、DOUBLE

float

STRING

str

详情请参见注释1

BINARY

bytes

DATETIME

datetime.datetime

详情请参见注释2

DATE

datetime.date

BOOLEAN

bool

DECIMAL

decimal.Decimal

详情请参见注释3

MAP

dict

ARRAY

list

STRUCT

tuple/namedtuple

详情请参见注释4

TIMESTAMP

pandas.Timestamp

详情请参见注释2,需要安装Pandas。

TIMESTAMP_NTZ

pandas.Timestamp

结果不受时区设置影响,需要安装Pandas。

INTERVAL_DAY_TIME

pandas.Timedelta

需要安装Pandas。

对部分类型的说明如下。

  • PyODPS默认字符串类型对应UNICODE字符串,在Python 3中表示为str,在 Python 2 中为unicode。对于某些在字符串中存储二进制值的场景,需要设置options.tunnel.string_as_binary = True;,以避免可能出现的编码问题。

  • PyODPS默认使用本地时间作为时区。若使用UTC时区,需要设置 options.local_timezone = False;选项。若使用其他时区,则需要将该参数设置为指定时区,例如Asia/Shanghai。MaxCompute不存储时区值,因此在写入数据时,会将该时间转换为UNIX时间戳进行存储。

  • 对于Python 2,当安装cdecimal包时,会使用cdecimal.Decimal类。

  • 对于0.11.5以下版本的PyODPS,MaxCompute中的STRUCT类型对应Python中的dict类型。当PyODPS0.11.5及以上版本时,则默认对应namedtuple类型。如果希望使用旧版行为,则需要设置选项options.struct_as_dict = True;。DataWorks环境下,为保持历史兼容性,该值默认为False。为Record设置STRUCT类型的字段值时,0.11.5及以上版本的PyODPS可同时接受dicttuple类型,而旧版则只接受dict类型。

  • 关于如何设置选项,请参考配置选项

写入表数据

  • 使用入口对象的write_table()方法写入数据。

    重要

    对于分区表,如果分区不存在,可以使用create_partition参数指定创建分区。

    records = [[111, 1.0],                 # 此处可以是list。
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table('my_new_table', records, partition='pt=test', create_partition=True)  #创建pt=test分区并写入数据
    说明
    • 每次调用write_table()方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此,建议您在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。

    • 调用write_table()方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用table.truncate()方法;对于分区表,需要删除分区后再建立新的分区。

  • 对表对象调用open_writer()方法写入数据。

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  #创建pt=test02分区并写入数据
        records = [[1, 1.0],                 # 此处可以是List。
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # 这里Records可以是可迭代对象。

    如果是多级分区表,写入示例如下。

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # 多级分区写法。
        records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象。
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, '中文', False])]
        writer.write(records)
  • 使用多进程并行写数据。

    每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # 对使用指定的ID创建Session。
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # 创建Writer时指定Block_ID。
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # 生成数据并写入对应Block。
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # 每个进程使用同一个Session_ID。
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # 最后执行Commit,并指定所有Block。
        upload_session.commit(block_ids)

获取表数据

获取表数据的方法有多种,常用方法如下:

  • 使用入口对象的read_table()方法。

    # 处理一条记录。
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • 如果您仅需要查看每个表最开始的小于1万条数据,可以对表对象调用head()方法。

    t = o.get_table('my_new_table')
    # 处理每个Record对象。
    for record in t.head(3):
        print(record)
  • 调用open_reader()方法读取数据。

    • 使用with表达式的写法如下。

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]:  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身
    • 不使用with表达式的写法如下。

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]:  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身

删除表

使用delete_table()方法删除已经存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在时,才删除表。
t.drop()  # Table对象存在时,直接调用Drop方法删除。

转换表为DataFrame

PyODPS提供了DataFrame框架,支持以更方便的方式查询和操作MaxCompute数据。使用to_df()方法,即可转化为DataFrame对象。

table = o.get_table('my_table_name')
df = table.to_df()

表分区

  • 判断是否为分区表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍历表全部分区。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍历所有分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍历 pt=test 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍历 dt>20230119 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    重要

    PyODPS0.11.3版本开始,支持为iterate_partitions指定逻辑表达式,如上述示例中的dt>20230119

  • 判断分区是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 获取分区。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 创建分区。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists参数,分区不存在时才创建分区。
  • 删除分区。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 自定if_exists参数,分区存在时才删除分区。
    partition.drop()  # 分区对象存在时,直接对分区对象调用Drop方法删除。

数据上传下载通道

TunnelMaxCompute的数据通道,用户可以通过TunnelMaxCompute中上传或者下载数据。

  • 上传数据示例

    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    upload_session.commit([0])
  • 下载数据示例

    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # 处理每条记录。
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # 具体的遍历步骤,这里是打印记录对象
说明
  • PyODPS不支持上传外部表,例如OSSOTS的表。

  • 不推荐直接使用Tunnel接口,推荐您直接使用对象的写和读接口。

  • 如果您安装了CPython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。