Python SDK示例:Table

本文为您介绍Python SDK中表相关的典型场景操作示例。

列出所有表

通过调用入口对象的list_tables()方法可以列出项目空间下的所有表。

for table in odps.list_tables():
    # 处理每张表。

判断表是否存在

通过调用入口对象的exist_table()方法判断表是否存在;通过调用get_table()方法获取表。

t = odps.get_table('table_name')
t.schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]            

创建表的Schema

初始化方法有如下两种:

  • 通过表的列以及可选的分区进行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化后,您可获取字段信息、分区信息等。

    • 获取所有字段信息。

      print(schema.columns)

      返回示例如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 获取分区字段。

      print(schema.partitions)

      返回示例如下。

      [<partition pt, type string>]
    • 获取非分区字段名称。

      print(schema.names)

      返回示例如下。

      ['num', 'num2']
    • 获取非分区字段类型。

      print(schema.types)

      返回示例如下。

      [bigint, double]
  • 使用Schema.from_lists()方法。该方法更容易调用,但无法直接设置列和分区的注释。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    返回值示例如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

您可以使用o.create_table()方法创建表,使用方式有两种:使用表Schema方式、使用字段名和字段类型方式。同时创建表时表字段的数据类型有一定的限制条件,详情如下。

使用表Schema创建表

使用表Schema创建表时,您需要先创建表的Schema,然后通过Schema创建表。

#创建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通过schema创建表
table = o.create_table('my_new_table', schema)

#只有不存在表时,才创建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)

#设置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表

#创建分区表my_new_table,可传入(表字段列表,分区字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

#创建非分区表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表创建完成后,您可以通过print(o.exist_table('my_new_table'))验证表是否创建成功,返回True表示表创建成功。

使用字段名及字段类型创建表:新数据类型

未打开新数据类型开关时(默认关闭),创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要创建TINYINT和STRUCT等新数据类型字段的表,可以打开options.sql.use_odps2_extension = True开关,示例如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

删除表

使用delete_table()方法删除已经存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在时,才删除表。
t.drop()  # Table对象存在时,直接调用Drop方法删除。

表分区

  • 判断是否为分区表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍历表全部分区。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍历所有分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍历 pt=test 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍历 dt>20230119 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    重要

    PyODPS自0.11.3版本开始,支持为iterate_partitions指定逻辑表达式,如上述示例中的dt>20230119

  • 判断分区是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 获取分区。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 创建分区。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists参数,分区不存在时才创建分区。
  • 删除分区。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 自定if_exists参数,分区存在时才删除分区。
    partition.drop()  # 分区对象存在时,直接对分区对象调用Drop方法删除。

读取表数据

有若干种方法能够获取表数据。

  • 如果只是查看每个表的开始的小于1万条数据,则可以使用head方法。

    from odps import ODPS
    t = o.get_table('dual')
    for record in t.head(3):
        # 处理每个Record对象
  • 使用 with 表达式的写法:

    with t.open_reader(partition='pt=test') as reader:
    count = reader.count
    for record in reader[5:10]:  # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
        # 处理一条记录
  • 不使用 with 表达式的写法:

    reader = t.open_reader(partition='pt=test')
    count = reader.count
    for record in reader[5:10]:  # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
        # 处理一条记录
  • 直接读取成 Pandas DataFrame:

    with t.open_reader(partition='pt=test') as reader:
    pd_df = reader.to_pandas()

写入表数据

类似于open_reader,table对象同样能执行open_writer来打开writer,并写数据。

  • 使用with写法:

    with t.open_writer(partition='pt=test') as writer:
    	  records = [[111, 'aaa', True],                 # 这里可以是list
    	             [222, 'bbb', False],
    	             [333, 'ccc', True],
    	             [444, '中文', False]]
        writer.write(records)  # 这里records可以是可迭代对象
    
    records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象
               t.new_record([222, 'bbb', False]),
               t.new_record([333, 'ccc', True]),
               t.new_record([444, '中文', False])]
    writer.write(records)
  • 如果分区不存在,可以使用 create_partition 参数指定创建分区,如:

    with t.open_writer(partition='pt=test', create_partition=True) as writer:
        records = [[111, 'aaa', True],                 # 这里可以是list
                   [222, 'bbb', False],
                   [333, 'ccc', True],
                   [444, '中文', False]]
        writer.write(records)  # 这里records可以是可迭代对象
  • 更简单的写数据方法是使用ODPS对象的write_table方法,例如:

    records = [[111, 'aaa', True],                 # 这里可以是list
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, '中文', False]]
    o.write_table('test_table', records, partition='pt=test', create_partition=True)
    说明
    • 每次调用write_table,MaxCompute 都会在服务端生成一个文件。这一操作需要较大的时间开销, 同时过多的文件会降低后续的查询效率。因此,建议您在使用write_table方法时,一次性写入多组数据, 或者传入一个generator对象。

    • write_table写表时会追加到原有数据。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除原有数据。对于非分区表,需要调用table.truncate(),对于分区表,需要删除分区后再建立。

使用Arrow格式读写数据

Apache Arrow是一种跨语言的通用数据读写格式,支持在各种不同平台间进行数据交换。 自2021年起, MaxCompute支持使用Arrow格式读取表数据,PyODPS则从0.11.2版本开始支持该功能。具体来说,如果在Python环境中安装pyarrow后,在调用open_writer时增加arrow=True参数,即可读写Arrow RecordBatch 。

import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
    records = [[111, 'aaa', True],
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, '中文', False]]
    df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
    # 写入 RecordBatch
    batch = pa.RecordBatch.from_pandas(df)
    writer.write(batch)
    # 也可以直接写入 Pandas DataFrame
    writer.write(df)