全部产品
MaxCompute

Python SDK

更新时间:2017-06-27 15:58:29   分享:   

PyODPS 是 MaxCompute 的 Python 版本的 SDK,它提供了对 MaxCompute 对象的基本操作和 DataFrame 框架,可以轻松在 MaxCompute 上进行数据分析。相关内容请参见: Github 项目 和包括所有接口,类的细节等内容的 详细文档

安装

PyODPS 支持 Python 2.6 以上(包括 Python 3),系统安装 pip 后,只需运行下 pip install pyodps,PyODPS 的相关依赖便会自动安装。

快速开始

首先,用阿里云账号初始化一个 MaxCompute 的入口,如下所示:

  1. from odps import ODPS
  2. odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
  3. endpoint='**your-end-point**')

根据上述操作初始化后,便可对表、资源、函数等进行操作。

项目空间

项目空间是 MaxCompute 的基本组织单元, 类似于 Database 的概念。

通过 get_project 获取到某个项目空间,如下所示:

  1. project = odps.get_project('my_project') # 取到某个项目
  2. project = odps.get_project() # 取到默认项目

注意:

  • 如果不提供参数,则取到默认项目空间;
  • exist_project 方法可以检验某个项目空间是否存在;

  • 表是 MaxCompute 的数据存储单元。

表操作

通过调用 list_tables 可以列出项目空间下的所有表,如下所示:

  1. for table in odps.list_tables():
  2. # 处理每张表

通过调用 exist_table 可以判断表是否存在,通过调用 get_table 可以获取表。

  1. >>> t = odps.get_table('dual')
  2. >>> t.schema
  3. odps.Schema {
  4. c_int_a bigint
  5. c_int_b bigint
  6. c_double_a double
  7. c_double_b double
  8. c_string_a string
  9. c_string_b string
  10. c_bool_a boolean
  11. c_bool_b boolean
  12. c_datetime_a datetime
  13. c_datetime_b datetime
  14. }
  15. >>> t.lifecycle
  16. -1
  17. >>> print(t.creation_time)
  18. 2014-05-15 14:58:43
  19. >>> t.is_virtual_view
  20. False
  21. >>> t.size
  22. 1408
  23. >>> t.schema.columns
  24. [<column c_int_a, type bigint>,
  25. <column c_int_b, type bigint>,
  26. <column c_double_a, type double>,
  27. <column c_double_b, type double>,
  28. <column c_string_a, type string>,
  29. <column c_string_b, type string>,
  30. <column c_bool_a, type boolean>,
  31. <column c_bool_b, type boolean>,
  32. <column c_datetime_a, type datetime>,
  33. <column c_datetime_b, type datetime>]

创建表的 Schema

初始化的方法有以下两种:

  • 通过表的列、以及可选的分区来初始化,如下所示:
  1. >>> from odps.models import Schema, Column, Partition
  2. >>> columns = [Column(name='num', type='bigint', comment='the column')]
  3. >>> partitions = [Partition(name='pt', type='string', comment='the partition')]
  4. >>> schema = Schema(columns=columns, partitions=partitions)
  5. >>> schema.columns
  6. [<column num, type bigint>, <partition pt, type string>]
  • 通过调用 Schema.from_lists,虽然调用更加方便,但显然无法直接设置列和分区的注释。如下所示:
  1. >>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
  2. >>> schema.columns
  3. [<column num, type bigint>, <partition pt, type string>]

创建表

创建表的操作如下所示:

  1. >>> table = odps.create_table('my_new_table', schema)
  2. >>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建

其他还可以设置 lifecycle 等参数。

获取表数据

获取表数据的方法有以下三种:

  • 通过调用 head 获取表数据,但仅限于查看每张表开始的小于 1 万条的数据,如下所示:

    1. >>> t = odps.get_table('dual')
    2. >>> for record in t.head(3):
    3. >>> print(record[0]) # 取第0个位置的值
    4. >>> print(record['c_double_a']) # 通过字段取值
    5. >>> print(record[0: 3]) # 切片操作
    6. >>> print(record[0, 2, 3]) # 取多个位置的值
    7. >>> print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
  • 通过在 table 上执行 open_reader 操作,打开一个 reader 来读取数据。注意:此方法需要使用 with 表达式。如下所示:

  1. >>> with t.open_reader(partition='pt=test') as reader:
  2. >>> count = reader.count
  3. >>> for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
  4. >>> # 处理一条记录
  • 通过使用 Tunnel API 读取表数据,open_reader 操作其实也是对 Tunnel API 的封装。

写入数据

类似于 open_reader,table 对象同样可以执行 open_writer 来打开 writer,并写数据,这里同样需要使用 with 表达式。如下所示:

  1. >>> with t.open_writer(partition='pt=test') as writer:
  2. >>> writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0
  3. >>>
  4. >>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block
  5. >>> writer.write(0, gen_records(block=0))
  6. >>> writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的

同样,向表中写入数据也是对 Tunnel API 的封装,详细内容请参见:数据上传下载通道

删除表

删除表的操作如下所示:

  1. >>> odps.delete_table('my_table_name', if_exists=True) # 只有表存在时删除
  2. >>> t.drop() # Table对象存在的时候可以直接执行drop函数

表分区

基本操作

遍历表全部分区:

  1. >>> for partition in table.partitions:
  2. >>> print(partition.name)
  3. >>> for partition in table.iterate_partitions(spec='pt=test'):
  4. >>> # 遍历二级分区

判断分区是否存在:

  1. >>> table.exist_partition('pt=test,sub=2015')

获取分区:

  1. >>> partition = table.get_partition('pt=test')
  2. >>> print(partition.creation_time)
  3. 2015-11-18 22:22:27
  4. >>> partition.size
  5. 0

创建分区

  1. >>> t.create_partition('pt=test', if_not_exists=True) # 不存在的时候才创建

删除分区

  1. >>> t.delete_partition('pt=test', if_exists=True) # 存在的时候才删除
  2. >>> partition.drop() # Partition对象存在的时候直接drop

SQL

PyODPS 支持 MaxCompute SQL 的查询,并可以读取执行的结果。

执行 SQL

  1. >>> odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
  2. >>> instance = odps.run_sql('select * from dual') # 异步的方式执行
  3. >>> instance.wait_for_success() # 阻塞直到完成

读取 SQL 执行结果

运行 SQL 的 instance 能够直接执行 open_reader 的操作,一种情况是 SQL 返回了结构化的数据。

  1. >>> with odps.execute_sql('select * from dual').open_reader() as reader:
  2. >>> for record in reader:
  3. >>> # 处理每一个record

另一种情况是 SQL 可能执行的比如 desc,这时通过 reader.raw 属性取到原始的 SQL 执行结果。

  1. >>> with odps.execute_sql('desc dual').open_reader() as reader:
  2. >>> print(reader.raw)

Resource

资源在 MaxCompute 上常用在 UDF 和 MapReduce 中。

列出所有资源还是可以使用 list_resources,判断资源是否存在使用 exist_resource。 删除资源时,可以调用 delete_resource,或者直接对于 Resource 对象调用 drop 方法。

在 PyODPS 中,主要支持两种资源类型,一种是文件,另一种是表。详细介绍如下:

文件资源

文件资源包括基础的 file 类型、以及 pyjararchive

创建文件资源

创建文件资源可以通过给定资源名、文件类型、以及一个 file-like 的对象(或者是字符串对象)来创建,示例如下:

  1. resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象
  2. resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串

读取和修改文件资源

对文件资源调用 open 方法,或者在 MaxCompute 入口调用 open_resource 都能打开一个资源, 打开后的对象会是 file-like 的对象。类似于 Python 内置的 open 方法,文件资源也支持打开的模式。示例如下:

  1. >>> with resource.open('r') as fp: # 以读模式打开
  2. >>> content = fp.read() # 读取全部的内容
  3. >>> fp.seek(0) # 回到资源开头
  4. >>> lines = fp.readlines() # 读成多行
  5. >>> fp.write('Hello World') # 报错,读模式下无法写资源
  6. >>>
  7. >>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开
  8. >>> fp.read()
  9. >>> fp.tell() # 当前位置
  10. >>> fp.seek(10)
  11. >>> fp.truncate() # 截断后面的内容
  12. >>> fp.writelines(['Hello\n', 'World\n']) # 写入多行
  13. >>> fp.write('Hello World')
  14. >>> fp.flush() # 手动调用会将更新提交到ODPS

所有支持的打开类型包括:

  • r,读模式,只能打开不能写;
  • w,写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空;
  • a,追加模式,只能写入内容到文件末尾;
  • r+,读写模式,能任意读写内容;
  • w+,类似于 r+,但会先清空文件内容;
  • a+,类似于 r+,但写入时只能写入文件末尾。

同时,PyODPS 中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此 rb 就是指以二进制读模式打开文件,r+b 是指以二进制读写模式打开。

表资源

创建表资源

  1. >>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')

更新表资源

  1. >>> table_resource = odps.get_resource('test_table_resource')
  2. >>> table_resource.update(partition='pt=test2', project_name='my_project2')

DataFrame

PyODPS 提供了 DataFrame API,它提供了类似 pandas 的接口,但是能充分利用 MaxCompute 的计算能力。完整的 DataFrame 文档请参见: DataFrame

DataFrame 的示例如下,在所有步骤开始前,需要创建 MaxCompute 对象。

  1. >>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
  2. project='**your-project**', endpoint='**your-end-point**'))

此处以 movielens 100K 作为示例,假设已经有三张表,分别是pyodps_ml_100k_movies(电影相关的数据),pyodps_ml_100k_users(用户相关的数据),pyodps_ml_100k_ratings(评分有关的数据)。

只需传入 Table 对象,便可创建一个 DataFrame 对象。如下所示:

  1. >>> from odps.df import DataFrame
  1. >>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))

通过 dtypes 属性来查看这个 DataFrame 有哪些字段,分别是什么类型,如下所示:

  1. >>> users.dtypes

通过 head 方法,可以获取前 N 条数据,方便快速预览数据。如下所示:

  1. >>> users.head(10)
  user_id age sex occupation zip_code
0 1 24 M technician 85711
1 2 53 F other 94043
2 3 23 M writer 32067
3 4 24 M technician 43537
4 5 33 F other 15213
5 6 42 M executive 98101
6 7 57 M administrator 91344
7 8 36 M administrator 05201
8 9 29 M student 01002
9 10 53 M lawyer 90703

有时候,并不需要都看到所有字段,便可以从中筛选出一部分。如下所示:

  1. >>> users[['user_id', 'age']].head(5)
  user_id age
0 1 24
1 2 53
2 3 23
3 4 24
4 5 33

有时候只是排除个别字段。如下所示:

  1. >>> users.exclude('zip_code', 'age').head(5)
  user_id sex occupation
0 1 M technician
1 2 F other
2 3 M writer
3 4 M technician
4 5 F other

排除掉一些字段的同时,想要通过计算得到一些新的列,比如将 sex 为 M 的置为 True,否则为 False,并取名叫 sex_bool。如下所示:

  1. >>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
  user_id age occupation sex_bool
0 1 24 technician True
1 2 53 other False
2 3 23 writer True
3 4 24 technician True
4 5 33 other False

若想知道年龄在 20 到 25 岁之间的人有多少个,如下所示:

  1. >>> users.age.between(20, 25).count().rename('count')
  2. 943

若想知道男女用户分别有多少,如下所示:

  1. >>> users.groupby(users.sex).count()
  sex count
0 F 273
1 M 670

若想将用户按职业划分,从高到底,获取人数最多的前 10 个职业,如下所示:

  1. >>> df = users.groupby('occupation').agg(count=users['occupation'].count())
  2. >>> df.sort(df['count'], ascending=False)[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

DataFrame API 提供了 value_counts 这个方法来快速达到同样的目的。如下所示:

  1. >>> users.occupation.value_counts()[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

使用更直观的图来查看这份数据。

  1. >>> %matplotlib inline

使用横向的柱状图来可视化,如下所示:

  1. >>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

image

将年龄分成 30 组,查看各年龄分布的直方图,如下所示:

  1. >>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

image

使用 join 把这三张表进行联合后,把它保存成一张新的表。如下所示:

  1. >>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
  2. >>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
  3. >>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
  4. >>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
  5. >>> lens.dtypes
  1. odps.Schema {
  2. movie_id int64
  3. title string
  4. release_date string
  5. video_release_date string
  6. imdb_url string
  7. user_id int64
  8. rating int64
  9. unix_timestamp int64
  10. age int64
  11. sex string
  12. occupation string
  13. zip_code string
  14. }

把 0 到 80 岁的年龄,分成 8 个年龄段,如下所示:

  1. >>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
  2. >>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')]

取分组和年龄唯一的前 10 条数据来进行查看,如下所示:

  1. >>> cut_lens['年龄分组', 'age'].distinct()[:10]
  年龄分组 age
0 0-9 7
1 10-19 10
2 10-19 11
3 10-19 13
4 10-19 14
5 10-19 15
6 10-19 16
7 10-19 17
8 10-19 18
9 10-19 19

对各个年龄分组下,用户的评分总数和评分均值进行查看,如下所示:

  1. >>> cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))
  年龄分组 评分均值 评分总数
0 0-9 3.767442 43
1 10-19 3.486126 8181
2 20-29 3.467333 39535
3 30-39 3.554444 25696
4 40-49 3.591772 15021
5 50-59 3.635800 8704
6 60-69 3.648875 2623
7 70-79 3.649746 197

Configuration

配置选项

PyODPS 提供了一系列的配置选项,可通过 odps.options 获得。可配置的 MaxCompute 选项,如下所示:

通用配置

选项 说明 默认值
end_point MaxCompute Endpoint None
default_project 默认 Project None
log_view_host LogView 主机名 None
log_view_hours LogView 保持时间(小时) 24
local_timezone 使用的时区,True 表示本地时间,False 表示 UTC,也可用 pytz 的时区 1
lifecycle 所有表生命周期 None
temp_lifecycle 临时表生命周期 1
biz_id 用户 ID None
verbose 是否打印日志 False
verbose_log 日志接收器 None
chunk_size 写入缓冲区大小 1496
retry_times 请求重试次数 4
pool_connections 缓存在连接池的连接数 10
pool_maxsize 连接池最大容量 10
connect_timeout 连接超时 5
read_timeout 读取超时 120
completion_size 对象补全列举条数限制 10
notebook_repr_widget 使用交互式图表 True
sql.settings ODPS SQL运行全局hints None

数据上传/下载配置

选项 说明 默认值
tunnel.endpoint Tunnel Endpoint None
tunnel.use_instance_tunnel 使用 Instance Tunnel 获取执行结果 True
tunnel.limited_instance_tunnel 限制 Instance Tunnel 获取结果的条数 True
tunnel.string_as_binary 在 string 类型中使用 bytes 而非 unicode False

DataFrame 配置

选项 说明 默认值
interactive 是否在交互式环境 根据检测值
df.analyze 是否启用非 ODPS 内置函数 True
df.optimize 是否开启DataFrame全部优化 True
df.optimizes.pp 是否开启DataFrame谓词下推优化 True
df.optimizes.cp 是否开启DataFrame列剪裁优化 True
df.optimizes.tunnel 是否开启DataFrame使用tunnel优化执行 True
df.quote ODPS SQL后端是否用``来标记字段和表名 True
df.libraries DataFrame运行使用的第三方库(资源名) None

PyODPS ML 配置

选项 说明 默认值
ml.xflow_project 默认 Xflow 工程名 algo_public
ml.use_model_transfer 是否使用 ModelTransfer 获取模型 PMML True
ml.model_volume 在使用 ModelTransfer 时使用的 Volume 名称 pyodps_volume
本文导读目录
本文导读目录
以上内容是否对您有帮助?