更新时间:2025-01-24 08:55:49

PyODPS支持对MaxCompute表的基本操作,包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何将表转换为DataFrame对象。

背景信息

PyODPS提供对MaxCompute表的基本操作方法。

操作

说明

操作

说明

基本操作

列出项目空间下的所有表、判断表是否存在、获取表等基本操作。

创建表的Schema

使用PyODPS创建表的Schema。

创建表

使用PyODPS创建表。

同步表更新

使用PyODPS同步表更新。

表记录类型(Record)

使用PyODPS读取/写入的记录数据结构。

写入表数据

使用PyODPS向表中写入数据。

获取表数据

使用PyODPS获取表中数据。

删除表

使用PyODPS删除表。

转换表为DataFrame

使用PyODPS转换表为DataFrame。

表分区

使用PyODPS判断是否为分区表、遍历表全部分区、判断分区是否存在、创建分区等。

数据上传下载通道

使用PyODPS操作TunnelMaxCompute中上传或者下载数据。

说明

更多PyODPS方法说明,请参见Python SDK方法说明

前提条件:准备运行环境

PyODPS支持在DataWorks的PyODPS节点或本地PC环境中运行,运行前您需先选择运行工具并准备好运行环境。
  • 使用DataWorks:创建好PyODPS 2节点或PyODPS 3节点,详情请参见通过DataWorks使用PyODPS
  • 使用本地PC环境:安装好PyODPS并初始化ODPS入口对象。

基本操作

当前项目内的表操作

  • 列出项目空间下的所有表:

    o.list_tables()方法可以列出项目空间下的所有表。

    for table in o.list_tables():
        print(table)

    可以通过prefix参数只列举给定前缀的表:

    for table in o.list_tables(prefix="table_prefix"):
        print(table.name)

    通过该方法获取的 Table 对象不会自动加载表名以外的属性,此时获取这些属性(例如table_schema或者creation_time)可能导致额外的请求并造成额外的时间开销。如果需要在列举表的同时读取这些属性,在 PyODPS 0.11.5 及后续版本中,可以为list_tables添加extended=True参数:

    for table in o.list_tables(extended=True):
        print(table.name, table.creation_time)

    如果需要按类型列举表,可以指定type参数。不同类型的表列举方法如下:

    managed_tables = list(o.list_tables(type="managed_table"))  # 列举内置表
    external_tables = list(o.list_tables(type="external_table"))  # 列举外表
    virtual_views = list(o.list_tables(type="virtual_view"))  # 列举视图
    materialized_views = list(o.list_tables(type="materialized_view"))  # 列举物化视图
  • 判断表是否存在:

    o.exist_table()方法可以判断表是否存在。

    print(o.exist_table('pyodps_iris'))
    # 返回True表示表pyodps_iris存在。
  • 获取表:

    入口对象的o.get_table()方法可以获取表。

    • 获取表的schema信息。

      t = o.get_table('pyodps_iris')
      print(t.schema)  # 获取表pyodps_iris的schema

      返回值示例如下。

      odps.Schema {
        sepallength           double      # 片长度(cm)
        sepalwidth            double      # 片宽度(cm)
        petallength           double      # 瓣长度(cm)
        petalwidth            double      # 瓣宽度(cm)
        name                  string      # 种类
      }
    • 获取表列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema.columns)  # 获取表pyodps_iris的schema中的列信息

      返回值示例如下。

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • 获取表的某个列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'])  # 获取表pyodps_iris的sepallength列信息

      返回值示例如下。

      <column sepallength, type double>
    • 获取表的某个列的备注信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment)  # 获取表pyodps_iris的sepallength列的备注信息

      返回示例如下。

      片长度(cm)
    • 获取表的生命周期。

      t = o.get_table('pyodps_iris')
      print(t.lifecycle)  # 获取表pyodps_iris的生命周期

      返回值示例如下。

      -1
    • 获取表的创建时间。

      t = o.get_table('pyodps_iris')
      print(t.creation_time)  # 获取表pyodps_iris的创建时间
    • 获取表是否是虚拟视图。

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view)  # 获取表pyodps_iris是否是虚拟视图,返回False,表示不是。

    与上述示例类似,您也可以通过t.sizet.comment来获取表的大小、表备注等信息。

    跨项目的表操作

    您可以通过project参数,跨项目获取表。

    t = o.get_table('table_name', project='other_project')

    其中other_project为所跨的项目,table_name为跨项目获取的表名称。

创建表的Schema

初始化方法有如下两种:

  • 通过表的列以及可选的分区进行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化后,您可获取字段信息、分区信息等。

    • 获取所有字段信息。

      print(schema.columns)

      返回示例如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 获取分区字段。

      print(schema.partitions)

      返回示例如下。

      [<partition pt, type string>]
    • 获取非分区字段名称。

      print(schema.names)

      返回示例如下。

      ['num', 'num2']
    • 获取非分区字段类型。

      print(schema.types)

      返回示例如下。

      [bigint, double]
  • 使用Schema.from_lists()方法。该方法更容易调用,但无法直接设置列和分区的注释。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    返回值示例如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

您可以使用o.create_table()方法创建表,使用方式有两种:使用表Schema方式、使用字段名和字段类型方式。同时创建表时表字段的数据类型有一定的限制条件,详情如下。

使用表Schema创建表

使用表Schema创建表时,您需要先创建表的Schema,然后通过Schema创建表。

#创建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通过schema创建表
table = o.create_table('my_new_table', schema)

#只有不存在表时,才创建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)

#设置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表

#创建分区表my_new_table,可传入(表字段列表,分区字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

#创建非分区表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表:新数据类型

未打开新数据类型开关时(默认关闭),创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAPARRAY类型。如果您需要创建TINYINTSTRUCT等新数据类型字段的表,可以打开options.sql.use_odps2_extension = True开关,示例如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

同步表更新

当一个表被其他程序更新,例如改变了Schema,可以调用reload()方法同步表的更新。

#表schema变更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通过reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()

表记录类型(Record)

Record类型表示表的一行记录,为Table.open_reader或者Table.open_writer接口在读写时所使用的数据结构,也用于Tunnel接口TableDownloadSession.open_record_reader或者TableUploadSession.open_record_writer 。在Table对象上调用new_record方法即可创建一个新的Record实例。

下述示例中,假定表结构为:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

该表对应的Record相关操作实例为:

import datetime
t = o.get_table('mytable')  # o 为 MaxCompute 入口对象
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])  # 值的个数必须等于表schema的字段数
r2 = t.new_record()  # 初始化时也可以不传入值
r2[0] = 1024  # 可以通过偏移设置值
r2['c_string_a'] = 'val1'  # 也可以通过字段名设置值
r2.c_string_a = 'val1'  # 通过属性设置值
r2.c_array_a = ['val1', 'val2']  # 设置 array 类型的值
r2.c_map_a = {1: 'val1'}  # 设置 map 类型的值
r2.c_struct_a = (1, 'val1')  # 使用 tuple 设置 struct 类型的值,当 PyODPS >= 0.11.5
r2.c_struct_a = {"a": 1, "b": 'val1'}  # 也可以使用 dict 设置 struct 类型的值

print(r[0])  # 取第0个位置的值
print(r['c_string_a'])  # 通过字段取值
print(r.c_string_a)  # 通过属性取值
print(r[0: 3])  # 切片操作
print(r[0, 2, 3])  # 取多个位置的值
print(r['c_int_a', 'c_double_a'])  # 通过多个字段取值

MaxCompute数据类型与Python类型在Record中的对应关系如下:

MaxCompute类型

Python类型

说明

MaxCompute类型

Python类型

说明

TINYINT、SMALLINT、INT、BIGINT

int

FLOAT、DOUBLE

float

STRING

str

详情请参见注释1

BINARY

bytes

DATETIME

datetime.datetime

详情请参见注释2

DATE

datetime.date

BOOLEAN

bool

DECIMAL

decimal.Decimal

详情请参见注释3

MAP

dict

ARRAY

list

STRUCT

tuple/namedtuple

详情请参见注释4

TIMESTAMP

pandas.Timestamp

详情请参见注释2,需要安装Pandas。

TIMESTAMP_NTZ

pandas.Timestamp

结果不受时区设置影响,需要安装Pandas。

INTERVAL_DAY_TIME

pandas.Timedelta

需要安装Pandas。

对部分类型的说明如下。

  • PyODPS默认字符串类型对应UNICODE字符串,在Python 3中表示为str,在 Python 2 中为unicode。对于某些在字符串中存储二进制值的场景,需要设置options.tunnel.string_as_binary = True;,以避免可能出现的编码问题。

  • PyODPS默认使用本地时间作为时区。若使用UTC时区,需要设置 options.local_timezone = False;选项。若使用其他时区,则需要将该参数设置为指定时区,例如Asia/Shanghai。MaxCompute不存储时区值,因此在写入数据时,会将该时间转换为UNIX时间戳进行存储。

  • 对于Python 2,当安装cdecimal包时,会使用cdecimal.Decimal类。

  • 对于0.11.5以下版本的PyODPS,MaxCompute中的STRUCT类型对应Python中的dict类型。当PyODPS0.11.5及以上版本时,则默认对应namedtuple类型。如果希望使用旧版行为,则需要设置选项options.struct_as_dict = True;。DataWorks环境下,为保持历史兼容性,该值默认为False。为Record设置STRUCT类型的字段值时,0.11.5及以上版本的PyODPS可同时接受dicttuple类型,而旧版则只接受dict类型。

  • 关于如何设置选项,请参考配置选项

写入表数据

  • 使用入口对象的write_table()方法写入数据。

    重要

    对于分区表,如果分区不存在,可以使用create_partition参数指定创建分区。

    records = [[111, 1.0],                 # 此处可以是list。
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table('my_new_table', records, partition='pt=test', create_partition=True)  #创建pt=test分区并写入数据
    说明
    • 每次调用write_table()方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此,建议您在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。

    • 调用write_table()方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用table.truncate()方法;对于分区表,需要删除分区后再建立新的分区。

  • 对表对象调用open_writer()方法写入数据。

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  #创建pt=test02分区并写入数据
        records = [[1, 1.0],                 # 此处可以是List。
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # 这里Records可以是可迭代对象。

    如果是多级分区表,写入示例如下。

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # 多级分区写法。
        records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象。
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, '中文', False])]
        writer.write(records)
  • 使用多进程并行写数据。

    每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # 对使用指定的ID创建Session。
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # 创建Writer时指定Block_ID。
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # 生成数据并写入对应Block。
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # 每个进程使用同一个Session_ID。
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # 最后执行Commit,并指定所有Block。
        upload_session.commit(block_ids)

获取表数据

获取表数据的方法有多种,常用方法如下:

  • 使用入口对象的read_table()方法。

    # 处理一条记录。
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • 如果您仅需要查看每个表最开始的小于1万条数据,可以对表对象调用head()方法。

    t = o.get_table('my_new_table')
    # 处理每个Record对象。
    for record in t.head(3):
        print(record)
  • 调用open_reader()方法读取数据。

    • 使用with表达式的写法如下。

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]:  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身
    • 不使用with表达式的写法如下。

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]:  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身

删除表

使用delete_table()方法删除已经存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在时,才删除表。
t.drop()  # Table对象存在时,直接调用Drop方法删除。

转换表为DataFrame

PyODPS提供了DataFrame框架,支持以更方便的方式查询和操作MaxCompute数据。使用to_df()方法,即可转化为DataFrame对象。

table = o.get_table('my_table_name')
df = table.to_df()

表分区

  • 判断是否为分区表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍历表全部分区。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍历所有分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍历 pt=test 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍历 dt>20230119 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    重要

    PyODPS0.11.3版本开始,支持为iterate_partitions指定逻辑表达式,如上述示例中的dt>20230119

  • 判断分区是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 获取分区。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 创建分区。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists参数,分区不存在时才创建分区。
  • 删除分区。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 自定if_exists参数,分区存在时才删除分区。
    partition.drop()  # 分区对象存在时,直接对分区对象调用Drop方法删除。

数据上传下载通道

TunnelMaxCompute的数据通道,用户可以通过TunnelMaxCompute中上传或者下载数据。

  • 上传数据示例

    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    upload_session.commit([0])
  • 下载数据示例

    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # 处理每条记录。
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # 具体的遍历步骤,这里是打印记录对象
说明
  • PyODPS不支持上传外部表,例如OSSOTS的表。

  • 不推荐直接使用Tunnel接口,推荐您直接使用对象的写和读接口。

  • 如果您安装了CPython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。

  • 本页导读 (1)
  • 背景信息
  • 前提条件:准备运行环境
  • 基本操作
  • 当前项目内的表操作
  • 跨项目的表操作
  • 创建表的Schema
  • 创建表
  • 使用表Schema创建表
  • 使用字段名及字段类型创建表
  • 使用字段名及字段类型创建表:新数据类型
  • 同步表更新
  • 表记录类型(Record)
  • 写入表数据
  • 获取表数据
  • 删除表
  • 转换表为DataFrame
  • 表分区
  • 数据上传下载通道