本文为您介绍Python SDK中表相关的典型场景操作示例。
列出所有表
通过调用入口对象的
list_tables()
方法可以列出项目空间下的所有表。# 处理每张表。
for table in odps.list_tables():
判断表是否存在
通过调用入口对象的
exist_table()
方法判断表是否存在;通过调用get_table()
方法获取表。t = odps.get_table('table_name')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
创建表的Schema
初始化方法有如下两种:
- 通过表的列以及可选的分区进行初始化。
from odps.models import Schema, Column, Partition columns = [Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2')] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions) schema.columns [<column num, type bigint>, <column num2, type double>, <partition pt, type string>] schema.partitions [<partition pt, type string>] schema.names #获取非分区字段的字段名。 ['num', 'num2'] schema.types #获取非分区字段的字段类型。 [bigint, double]
- 使用
Schema.from_lists()
方法。该方法更容易调用,但无法直接设置列和分区的注释。schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) schema.columns [<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
创建表
使用
create_table()
方法创建表的方式有如下两种:
- 使用表Schema创建表,方法如下。
table = o.create_table('my_new_table', schema) #只有不存在表时,才创建表。 table = o.create_table('my_new_table', schema, if_not_exists=True) #设置生命周期。 table = o.create_table('my_new_table', schema, lifecycle=7)
- 采用字段名及字段类型字符串创建表,方法如下。
#创建非分区表。 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) #创建分区表可传入(表字段列表,分区字段列表)。 table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
未打开新数据类型开关时,创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要支持TINYINT和STRUCT等新数据类型,可以打开options.sql.use_odps2_extension = True
开关,示例如下。from odps import options options.sql.use_odps2_extension = True table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
删除表
使用
delete_table()
方法删除已经存在的表。o.delete_table('my_table_name', if_exists=True) # 只有表存在时,才删除表。
t.drop() # Table对象存在时,直接调用Drop方法删除。
表分区
- 判断是否为分区表。
if table.schema.partitions: print('Table %s is partitioned.' % table.name)
- 遍历表全部分区。
for partition in table.partitions: print(partition.name) for partition in table.iterate_partitions(spec='pt=test'): # 遍历二级分区。
- 判断分区是否存在。
table.exist_partition('pt=test,sub=2015')
- 获取分区。
partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
- 创建分区。
t.create_partition('pt=test', if_not_exists=True) # 指定if_not_exists参数,分区不存在时才创建分区。
- 删除分区。
t.delete_partition('pt=test', if_exists=True) # 指定if_exists参数,分区存在时才删除分区。 partition.drop() # 分区对象存在时,直接对分区对象调用Drop方法删除。
读取表数据
有若干种方法能够获取表数据。
- 如果只是查看每个表的开始的小于1万条数据,则可以使用head方法。
from odps import ODPS o = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**', endpoint='**your-end-point**') t = o.get_table('dual') for record in t.head(3): # 处理每个Record对象
- 使用 with 表达式的写法:
with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10]: # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作 # 处理一条记录
- 不使用 with 表达式的写法:
reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10]: # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作 # 处理一条记录
- 直接读取成 Pandas DataFrame:
with t.open_reader(partition='pt=test') as reader: pd_df = reader.to_pandas()
写入表数据
类似于
open_reader
,table对象同样能执行open_writer
来打开writer,并写数据。
- 使用with写法:
with t.open_writer(partition='pt=test') as writer: records = [[111, 'aaa', True], # 这里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) # 这里records可以是可迭代对象 records = [t.new_record([111, 'aaa', True]), # 也可以是Record对象 t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, '中文', False])] writer.write(records)
- 如果分区不存在,可以使用 create_partition 参数指定创建分区,如:
with t.open_writer(partition='pt=test', create_partition=True) as writer: records = [[111, 'aaa', True], # 这里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) # 这里records可以是可迭代对象
- 更简单的写数据方法是使用ODPS对象的write_table方法,例如:
ecords = [[111, 'aaa', True], # 这里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] o.write_table('test_table', records, partition='pt=test', create_partition=True)
说明- 每次调用
write_table
,MaxCompute 都会在服务端生成一个文件。这一操作需要较大的时间开销, 同时过多的文件会降低后续的查询效率。因此,建议您在使用write_table
方法时,一次性写入多组数据, 或者传入一个generator对象。 write_table
写表时会追加到原有数据。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除原有数据。对于非分区表,需要调用table.truncate()
,对于分区表,需要删除分区后再建立。
- 每次调用
使用Arrow格式读写数据
Apache Arrow是一种跨语言的通用数据读写格式,支持在各种不同平台间进行数据交换。 自2021年起, MaxCompute支持使用Arrow格式读取表数据,PyODPS则从0.11.2版本开始支持该功能。具体来说,如果在Python环境中安装pyarrow后,在调用
open_writer
时增加arrow=True
参数,即可读写Arrow RecordBatch 。import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
records = [[111, 'aaa', True],
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
# 写入 RecordBatch
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# 也可以直接写入 Pandas DataFrame
writer.write(df)