本文向您介绍MaxCompute表的基本操作。包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何转换为DataFrame对象。

基本操作

是MaxCompute的数据存储单元。您可以使用ODPS入口对象的list_tables来列出项目空间下的所有表。

for table in o.list_tables():
# 处理每个表。

您可以通过调用exist_table来判断表是否存在,通过调用get_table来获取表。

t = o.get_table('dual')
t.schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.comment
'Dual Table Comment'
t.schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]
t.schema['c_int_a']
<column c_int_a, type bigint>
t.schema['c_int_a'].comment
'Comment of column c_int_a'

通过提供project参数,您可以实现跨Project获取表。

t = o.get_table('dual', project='other_project')

创建表的Schema

有两种方法完成初始化:
  • 第一种方式即通过表的列、以及可选的分区来进行初始化。
    from odps.models import Schema, Column, Partition
    columns = [Column(name='num', type='bigint', comment='the column'),
               Column(name='num2', type='double', comment='the column2')]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)
    schema.columns
    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]
    schema.partitions
    [<partition pt, type string>]
    schema.names  # 获取非分区字段的字段名
    ['num', 'num2']
    schema.types  # 获取非分区字段的字段类型
    [bigint, double]
  • 第二种方法是使用Schema.from_lists。这种方法更容易调用,但无法直接设置列和分区的注释。
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    schema.columns
    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

您可以使用表Schema来创建表,方法如下。

table = o.create_table('my_new_table', schema)
table = o.create_table('my_new_table', schema, if_not_exists=True)  #只有不存在表时才创建。
table = o.create_table('my_new_table', schema, lifecycle=7)  #设置生命周期。

更简单的方式是采用 字段名及字段类型 字符串来创建表,方法如下。

table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
# 创建分区表可传入 (表字段列表, 分区字段列表)
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

在未经设置的情况下,创建表时,只允许使用BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要支持TINYINT、STRUCT等新类型,可以设置 options.sql.use_odps2_extension = True打开这些类型的支持,示例如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')

同步表更新

当一个表被其他程序更新,例如改变了Schema之后,可以调用 reload 方法同步表的更新。

table.reload()

行记录Record

Record表示表的一行记录,您在Table对象上调用new_record即可创建一个新的Record。

t = o.get_table('mytable')
r = t.new_record(['val0', 'val1'])  # 值的个数必须等于表schema的字段数。
r2 = t.new_record()  #  您也可以不传入值。
r2[0] = 'val0' # 可以通过偏移设置值。
r2['field1'] = 'val1'  # 也可以通过字段名设置值。
r2.field1 = 'val1'  # 通过属性设置值。

print(record[0])  # 取第0个位置的值。
print(record['c_double_a'])  # 通过字段取值。
print(record.c_double_a)  # 通过属性取值。
print(record[0: 3])  # 切片操作。
print(record[0, 2, 3])  # 取多个位置的值。
print(record['c_int_a', 'c_double_a'])  # 通过多个字段取值。

获取表数据

获取表数据的方法很多,如果您仅需要查看每个表的最开始的小于1万条数据,可以使用head方法。

t = o.get_table('dual')
for record in t.head(3):
# 处理每个Record对象。

或在Table上执行open_reader操作来读取数据。

使用with表达式的写法如下所示。

with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10]  # 可以执行多次,直到将count数量的record读完,此处可以改造成并行操作。
# 处理一条记录。

不使用with表达式的写法如下所示。

reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10]  # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作。
# 处理一条记录。

更简单的调用方法是使用ODPS对象的read_table方法,举例如下。

for record in o.read_table('test_table', partition='pt=test'):
# 处理一条记录。

写入数据

类似于open_reader,Table对象同样能执行open_writer来开启writer,并写入数据。

使用with表达式的写法如下所示。

with t.open_writer(partition='pt=test') as writer:
records = [[111, 'aaa', True],                 # 这里可以是list。
          [222, 'bbb', False],
          [333, 'ccc', True],
          [444, '中文', False]]
writer.write(records)  # 这里records可以是可迭代对象。

with t.open_writer(partition='pt1=test1,pt2=test2') as writer: #多级分区写法

records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象。
           t.new_record([222, 'bbb', False]),
           t.new_record([333, 'ccc', True]),
           t.new_record([444, '中文', False])]
writer.write(records)

with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer:  # 此处同时打开两个block。
     writer.write(0, gen_records(block=0))
     writer.write(1, gen_records(block=1))  # 此处两个写操作可以多线程并行,各个block间是独立的。  

不使用with表达式的写法如下所示。

writer = t.open_writer(partition='pt=test', blocks=[0, 1])
writer.write(0, gen_records(block=0))
writer.write(1, gen_records(block=1))
writer.close()  # 请不要忘记关闭writer,否则数据可能写入不完全。

如果分区不存在,可以使用create_partition参数指定创建分区,举例如下。

with t.open_writer(partition='pt=test', create_partition=True) as writer:
    records = [[111, 'aaa', True],                 # 这里可以是list。
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, '中文', False]]
    writer.write(records)  # 这里records可以是可迭代对象。
说明 每次调用write_table,MaxCompute都会在服务端生成一个文件。这一操作需要较大的时间开销, 同时过多的文件会降低后续的查询效率。因此,建议在使用write_table方法时,一次性写入多组数据, 或者传入一个Generator对象。

删除表

o.delete_table('my_table_name', if_exists=True)  # 只有表存在时删除。
 t.drop()  # Table对象存在的时候可以直接执行drop函数。

创建DataFrame

PyODPS提供了DataFrame框架DataFrame框架 ,支持更方便地方式来查询和操作ODPS数据。 使用to_df方法,即可转化为DataFrame对象。

table = o.get_table('my_table_name')
 df = table.to_df()

表分区

  • 基本操作

    判断是否为分区表:

    if table.schema.partitions:
     print('Table %s is partitioned.' % table.name)

    遍历表全部分区:

    for partition in table.partitions:
         print(partition.name)
    for partition in table.iterate_partitions(spec='pt=test'):
    # 遍历二级分区。

    判断分区是否存在:

    table.exist_partition('pt=test,sub=2015')

    获取分区:

    partition = table.get_partition('pt=test')
     print(partition.creation_time)
    2015-11-18 22:22:27
     partition.size
    0
  • 创建分区
    t.create_partition('pt=test', if_not_exists=True)  # 分区。不存在时候才创建。
  • 删除分区
    t.delete_partition('pt=test', if_exists=True)  # 存在时才删除。
    partition.drop()  # Partition对象存在的时候直接drop。

数据上传下载通道

ODPS Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向ODPS中上传或者下载数据。

说明 不推荐直接使用Tunnel接口,推荐您直接使用表的写和读接口。 如果安装了 Cython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。
  • 上传示例
    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    upload_session.commit([0])
  • 下载示例
    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
    # 处理每条记录。
说明 PyODPS不支持上传外部表(OSS/OTS)。