PyODPS支持对MaxCompute表的基本操作,包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何将表转换为DataFrame对象。
背景信息
PyODPS提供对MaxCompute表的基本操作方法。
操作 | 说明 |
---|---|
基本操作 | 列出项目空间下的所有表、判断表是否存在、获取表等基本操作。 |
创建表 | 使用PyODPS创建表。 |
创建表的Schema | 使用PyODPS创建表的Schema。 |
同步表更新 | 使用PyODPS同步表更新。 |
向表中插入一行记录 | 使用PyODPS向表中插入一行记录。 |
写入数据 | 使用PyODPS向表中写入数据。 |
获取表数据 | 使用PyODPS获取表中数据。 |
删除表 | 使用PyODPS删除表。 |
转换表为DataFrame | 使用PyODPS转换表为DataFrame。 |
表分区 | 使用PyODPS判断是否为分区表、遍历表全部分区、判断分区是否存在、创建分区等。 |
数据上传下载通道 | 使用PyODPS操作Tunnel向MaxCompute中上传或者下载数据。 |
说明 更多PyODPS方法说明,请参见Python SDK方法说明。
基本操作
- 列出项目空间下的所有表:入口对象的
list_tables()
方法可以列出项目空间下的所有表。#列出项目空间下的所有表。 for table in o.list_tables():
- 判断表是否存在:入口对象的
exist_table()
方法可以判断表是否存在。 - 获取表:入口对象的
get_table()
方法可以获取表。t = o.get_table('table_name') t.schema odps.Schema { c_int_a bigint c_int_b bigint c_double_a double c_double_b double c_string_a string c_string_b string c_bool_a boolean c_bool_b boolean c_datetime_a datetime c_datetime_b datetime } t.lifecycle -1 print(t.creation_time) 2014-05-15 14:58:43 t.is_virtual_view False t.size 1408 t.comment 'table_name Table Comment' t.schema.columns [<column c_int_a, type bigint>, <column c_int_b, type bigint>, <column c_double_a, type double>, <column c_double_b, type double>, <column c_string_a, type string>, <column c_string_b, type string>, <column c_bool_a, type boolean>, <column c_bool_b, type boolean>, <column c_datetime_a, type datetime>, <column c_datetime_b, type datetime>] t.schema['c_int_a'] <column c_int_a, type bigint> t.schema['c_int_a'].comment 'Comment of column c_int_a'
您可以通过project
参数,跨项目获取表。t = o.get_table('table_name', project='other_project')
说明 表是MaxCompute的数据存储单元,详情请参见表。
创建表的Schema
初始化方法有如下两种:
- 通过表的列以及可选的分区进行初始化。
from odps.models import Schema, Column, Partition columns = [Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2')] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions) schema.columns [<column num, type bigint>, <column num2, type double>, <partition pt, type string>] schema.partitions [<partition pt, type string>] schema.names #获取非分区字段的字段名。 ['num', 'num2'] schema.types #获取非分区字段的字段类型。 [bigint, double]
- 使用
Schema.from_lists()
方法。该方法更容易调用,但无法直接设置列和分区的注释。schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) schema.columns [<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
创建表
使用
create_table()
方法创建表的方式有如下两种:
- 使用表Schema创建表,方法如下。
table = o.create_table('my_new_table', schema) #只有不存在表时,才创建表。 table = o.create_table('my_new_table', schema, if_not_exists=True) #设置生命周期。 table = o.create_table('my_new_table', schema, lifecycle=7)
- 采用字段名及字段类型字符串创建表,方法如下。
#创建非分区表。 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) #创建分区表可传入(表字段列表,分区字段列表)。 table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
未打开新数据类型开关时,创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要支持TINYINT和STRUCT等新数据类型,可以打开options.sql.use_odps2_extension = True
开关,示例如下。from odps import options options.sql.use_odps2_extension = True table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
同步表更新
当一个表被其他程序更新,例如改变了Schema,可以调用reload()
方法同步表的更新。
table.reload()
向表中插入一行记录
Record表示表的一行记录,对表对象调用
new_record()
方法即可创建一个新的Record。t = o.get_table('mytable')
r = t.new_record(['val0', 'val1']) # 值的个数必须等于表Schema的字段数。
r2 = t.new_record() # 可以不传入值。
r2[0] = 'val0' # 通过偏移设置值。
r2['field1'] = 'val1' # 通过字段名设置值。
r2.field1 = 'val1' # 通过属性设置值。
print(record[0]) # 取第0个位置的值。
print(record['c_double_a']) # 通过字段取值。
print(record.c_double_a) # 通过属性取值。
print(record[0: 3]) # 切片操作。
print(record[0, 2, 3]) # 取多个位置的值。
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值。
写入数据
- 使用入口对象的
write_table()
方法写入数据。records = [[111, 'aaa', True], # 此处可以是list。 [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] o.write_table('test_table', records, partition='pt=test', create_partition=True)
说明- 每次调用
write_table()
方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此,建议您在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。 - 调用
write_table()
方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用table.truncate()
方法;对于分区表,需要删除分区后再建立新的分区。
- 每次调用
- 对表对象调用
open_writer()
方法写入数据。with t.open_writer(partition='pt=test') as writer: records = [[111, 'aaa', True], #此处可以是List。 [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) #这里Records可以是可迭代对象。 with t.open_writer(partition='pt1=test1,pt2=test2') as writer: #多级分区写法。 records = [t.new_record([111, 'aaa', True]), #也可以是Record对象。 t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, '中文', False])] writer.write(records)
如果分区不存在,可以使用create_partition参数指定创建分区,如下所示。with t.open_writer(partition='pt=test', create_partition=True) as writer: records = [[111, 'aaa', True], # 这里可以是List。 [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) # 此处的Records可以是可迭代对象。
- 使用多进程并行写数据。
每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。
import random from multiprocessing import Pool from odps.tunnel import TableTunnel def write_records(tunnel, table, session_id, block_id): # 对使用指定的ID创建Session。 local_session = tunnel.create_upload_session(table.name, upload_id=session_id) # 创建Writer时指定Block_ID。 with local_session.open_record_writer(block_id) as writer: for i in range(5): # 生成数据并写入对应Block。 record = table.new_record([random.randint(1, 100), random.random()]) writer.write(record) if __name__ == '__main__': N_WORKERS = 3 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session(table.name) # 每个进程使用同一个Session_ID。 session_id = upload_session.id pool = Pool(processes=N_WORKERS) futures = [] block_ids = [] for i in range(N_WORKERS): futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i))) block_ids.append(i) [f.get() for f in futures] # 最后执行Commit,并指定所有Block。 upload_session.commit(block_ids)
获取表数据
获取表数据的方法有多种,常用方法如下:
- 使用入口对象的
read_table()
方法。# 处理一条记录。 for record in o.read_table('test_table', partition='pt=test'):
- 如果您仅需要查看每个表最开始的小于1万条数据,可以对表对象调用
head()
方法。t = o.get_table('table_name') # 处理每个Record对象。 for record in t.head(3):
- 调用
open_reader()
方法读取数据。- 使用
with
表达式的写法如下。with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10] # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。 # 处理一条记录。
- 不使用
with
表达式的写法如下。reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10] # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。 # 处理一条记录。
- 使用
删除表
使用
delete_table()
方法删除已经存在的表。o.delete_table('my_table_name', if_exists=True) # 只有表存在时,才删除表。
t.drop() # Table对象存在时,直接调用Drop方法删除。
转换表为DataFrame
PyODPS提供了DataFrame框架,支持以更方便的方式查询和操作MaxCompute数据。使用to_df()
方法,即可转化为DataFrame对象。
table = o.get_table('my_table_name')
df = table.to_df()
表分区
- 判断是否为分区表。
if table.schema.partitions: print('Table %s is partitioned.' % table.name)
- 遍历表全部分区。
for partition in table.partitions: print(partition.name) for partition in table.iterate_partitions(spec='pt=test'): # 遍历二级分区。
- 判断分区是否存在。
table.exist_partition('pt=test,sub=2015')
- 获取分区。
partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
- 创建分区。
t.create_partition('pt=test', if_not_exists=True) # 指定if_not_exists参数,分区不存在时才创建分区。
- 删除分区。
t.delete_partition('pt=test', if_exists=True) # 指定if_exists参数,分区存在时才删除分区。 partition.drop() # 分区对象存在时,直接对分区对象调用Drop方法删除。
数据上传下载通道
Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。
- 上传数据示例
from odps.tunnel import TableTunnel table = o.get_table('my_table') tunnel = TableTunnel(odps) upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test') with upload_session.open_record_writer(0) as writer: record = table.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = table.new_record(['test2', 'id2']) writer.write(record) upload_session.commit([0])
- 下载数据示例
from odps.tunnel import TableTunnel tunnel = TableTunnel(odps) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # 处理每条记录。 with download_session.open_record_reader(0, download_session.count) as reader: for record in reader:
说明
- PyODPS不支持上传外部表,例如OSS和OTS的表。
- 不推荐直接使用Tunnel接口,推荐您直接使用表对象的写和读接口。
- 如果您安装了CPython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。