全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
MaxCompute

Python SDK

更新时间:2018-04-26 11:44:18

PyODPS是MaxCompute的Python版本的SDK,它提供了对MaxCompute对象的基本操作和DataFrame框架,可以轻松地在MaxCompute上进行数据分析。更多详情请参见Github项目和包括所有接口、类的细节等内容的详细文档

  • 关于PyODPS的更多详情请参见PyODPS云栖社区专辑

  • 欢迎各位开发者参与到PyODPS的生态开发中,详情请参见GitHub文档

  • 欢迎提交issue和merge request,加快PyODPS生态成长,更多详情请参见代码

  • 钉钉技术交流群:11701793

安装

PyODPS支持Python2.6以上(包括Python3),系统安装pip后,只需运行pip install pyodps,PyODPS的相关依赖便会自动安装。

快速开始

首先,用阿里云账号初始化一个MaxCompute的入口,如下所示:

  1. from odps import ODPS
  2. odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
  3. endpoint='**your-end-point**')

根据上述操作初始化后,便可对表、资源、函数等进行操作。

项目空间

项目空间是MaxCompute的基本组织单元,类似于Database的概念。

您可通过get_project获取到某个项目空间,如下所示:

  1. project = odps.get_project('my_project') # 取到某个项目
  2. project = odps.get_project() # 取到默认项目

注意:

  • 如果不提供参数,则获取到默认项目空间。
  • 通过exist_project,可以查看某个项目空间是否存在。

  • 表是MaxCompute的数据存储单元。

表操作

通过调用list_tables可以列出项目空间下的所有表,如下所示:

  1. for table in odps.list_tables():
  2. # 处理每张表

通过调用exist_table可以判断表是否存在,通过调用get_table可以获取表。

  1. >>> t = odps.get_table('dual')
  2. >>> t.schema
  3. odps.Schema {
  4. c_int_a bigint
  5. c_int_b bigint
  6. c_double_a double
  7. c_double_b double
  8. c_string_a string
  9. c_string_b string
  10. c_bool_a boolean
  11. c_bool_b boolean
  12. c_datetime_a datetime
  13. c_datetime_b datetime
  14. }
  15. >>> t.lifecycle
  16. -1
  17. >>> print(t.creation_time)
  18. 2014-05-15 14:58:43
  19. >>> t.is_virtual_view
  20. False
  21. >>> t.size
  22. 1408
  23. >>> t.schema.columns
  24. [<column c_int_a, type bigint>,
  25. <column c_int_b, type bigint>,
  26. <column c_double_a, type double>,
  27. <column c_double_b, type double>,
  28. <column c_string_a, type string>,
  29. <column c_string_b, type string>,
  30. <column c_bool_a, type boolean>,
  31. <column c_bool_b, type boolean>,
  32. <column c_datetime_a, type datetime>,
  33. <column c_datetime_b, type datetime>]

创建表的Schema

初始化的方法有两种,如下所示:

  • 通过表的列和可选的分区来初始化。

    1. >>> from odps.models import Schema, Column, Partition
    2. >>> columns = [Column(name='num', type='bigint', comment='the column')]
    3. >>> partitions = [Partition(name='pt', type='string', comment='the partition')]
    4. >>> schema = Schema(columns=columns, partitions=partitions)
    5. >>> schema.columns
    6. [<column num, type bigint>, <partition pt, type string>]
  • 通过调用Schema.from_lists,虽然调用更加方便,但显然无法直接设置列和分区的注释。

    1. >>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
    2. >>> schema.columns
    3. [<column num, type bigint>, <partition pt, type string>]

创建表

您可以使用表的Schema来创建表,操作如下所示:

  1. >>> table = odps.create_table('my_new_table', schema)
  2. >>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建
  3. >>> table = o.create_table('my_new_table', schema, lifecycle=7) # 设置生命周期

也可以使用逗号连接的字段名 字段类型字符串组合来创建表,操作如下所示:

  1. >>> # 创建非分区表
  2. >>> table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
  3. >>> # 创建分区表可传入 (表字段列表, 分区字段列表)
  4. >>> table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

在未经设置的情况下,创建表时,只允许使用bigint、double、decimal、string、datetime、boolean、map和array类型。

如果您的服务位于公共云,或者支持tinyint、struct等新类型,可以设置options.sql.use_odps2_extension = True,以打开这些类型的支持,示例如下:

  1. >>> from odps import options
  2. >>> options.sql.use_odps2_extension = True
  3. >>> table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')

获取表数据

您可通过以下三种方法获取表数据。

  • 通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据,如下所示:

    1. >>> t = odps.get_table('dual')
    2. >>> for record in t.head(3):
    3. >>> print(record[0]) # 取第0个位置的值
    4. >>> print(record['c_double_a']) # 通过字段取值
    5. >>> print(record[0: 3]) # 切片操作
    6. >>> print(record[0, 2, 3]) # 取多个位置的值
    7. >>> print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
  • 通过在table上执行open_reader操作,打开一个reader来读取数据。您可以使用with表达式,也可以不使用。

    1. # 使用with表达式
    2. >>> with t.open_reader(partition='pt=test') as reader:
    3. >>> count = reader.count
    4. >>> for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
    5. >>> # 处理一条记录
    6. >>>
    7. >>> # 不使用with表达式
    8. >>> reader = t.open_reader(partition='pt=test')
    9. >>> count = reader.count
    10. >>> for record in reader[5:10]
    11. >>> # 处理一条记录
  • 通过使用Tunnel API读取表数据,open_reader操作其实也是对Tunnel API的封装。

写入数据

类似于open_reader,table对象同样可以执行open_writer来打开writer,并写数据。如下所示:

  1. >>> # 使用 with 表达式
  2. >>> with t.open_writer(partition='pt=test') as writer:
  3. >>> writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0
  4. >>>
  5. >>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block
  6. >>> writer.write(0, gen_records(block=0))
  7. >>> writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的
  8. >>>
  9. >>> # 不使用 with 表达式
  10. >>> writer = t.open_writer(partition='pt=test', blocks=[0, 1])
  11. >>> writer.write(0, gen_records(block=0))
  12. >>> writer.write(1, gen_records(block=1))
  13. >>> writer.close() # 不要忘记关闭 writer,否则数据可能写入不完全

同样,向表中写入数据也是对Tunnel API的封装,更多详情请参见数据上传下载通道

删除表

删除表的操作,如下所示:

  1. >>> odps.delete_table('my_table_name', if_exists=True) # 只有表存在时删除
  2. >>> t.drop() # Table对象存在的时候可以直接执行drop函数

表分区

基本操作

遍历表的全部分区,如下所示:

  1. >>> for partition in table.partitions:
  2. >>> print(partition.name)
  3. >>> for partition in table.iterate_partitions(spec='pt=test'):
  4. >>> # 遍历二级分区

判断分区是否存在,如下所示:

  1. >>> table.exist_partition('pt=test,sub=2015')

获取分区,如下所示:

  1. >>> partition = table.get_partition('pt=test')
  2. >>> print(partition.creation_time)
  3. 2015-11-18 22:22:27
  4. >>> partition.size
  5. 0

创建分区

  1. >>> t.create_partition('pt=test', if_not_exists=True) # 不存在的时候才创建

删除分区

  1. >>> t.delete_partition('pt=test', if_exists=True) # 存在的时候才删除
  2. >>> partition.drop() # Partition对象存在的时候直接drop

SQL

PyODPS支持MaxCompute SQL的查询,并可以读取执行的结果。

执行SQL

  1. >>> odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
  2. >>> instance = odps.run_sql('select * from dual') # 异步的方式执行
  3. >>> instance.wait_for_success() # 阻塞直到完成

读取SQL执行结果

运行SQL的instance能够直接执行open_reader的操作,一种情况是SQL返回了结构化的数据。

  1. >>> with odps.execute_sql('select * from dual').open_reader() as reader:
  2. >>> for record in reader:
  3. >>> # 处理每一个record

另一种情况是SQL可能执行的比如desc,这时通过reader.raw属性取到原始的SQL执行结果。

  1. >>> with odps.execute_sql('desc dual').open_reader() as reader:
  2. >>> print(reader.raw)

Resource

资源在MaxCompute上常用在UDF和MapReduce中。

列出所有资源还是可以使用list_resources,判断资源是否存在使用exist_resource。删除资源时,可以调用delete_resource,或者直接对于Resource对象调用drop方法。

在PyODPS中,主要支持两种资源类型,一种是文件,另一种是表。

文件资源

文件资源包括基础的file类型、以及pyjararchive

注意

在DataWorks中,py格式的文件资源请以file形式上传,详情请参见Python UDF文档

创建文件资源

创建文件资源可以通过给定资源名、文件类型、以及一个file-like的对象(或者是字符串对象)来创建,示例如下:

  1. resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象
  2. resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串

读取和修改文件资源

对文件资源调用open方法,或者在MaxCompute入口调用open_resource都能打开一个资源, 打开后的对象会是file-like的对象。类似于Python内置的open方法,文件资源也支持打开的模式。示例如下:

  1. >>> with resource.open('r') as fp: # 以读模式打开
  2. >>> content = fp.read() # 读取全部的内容
  3. >>> fp.seek(0) # 回到资源开头
  4. >>> lines = fp.readlines() # 读成多行
  5. >>> fp.write('Hello World') # 报错,读模式下无法写资源
  6. >>>
  7. >>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开
  8. >>> fp.read()
  9. >>> fp.tell() # 当前位置
  10. >>> fp.seek(10)
  11. >>> fp.truncate() # 截断后面的内容
  12. >>> fp.writelines(['Hello\n', 'World\n']) # 写入多行
  13. >>> fp.write('Hello World')
  14. >>> fp.flush() # 手动调用会将更新提交到ODPS

所有支持的打开类型包括:

  • r:读模式,只能打开不能写。

  • w:写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空。

  • a:追加模式,只能写入内容到文件末尾。

  • r+:读写模式,能任意读写内容。

  • w+:类似于r+,但会先清空文件内容。

  • a+:类似于r+,但写入时只能写入文件末尾。

同时,PyODPS中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此rb就是指以二进制读模式打开文件,r+b是指以二进制读写模式打开。

表资源

创建表资源

  1. >>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')

更新表资源

  1. >>> table_resource = odps.get_resource('test_table_resource')
  2. >>> table_resource.update(partition='pt=test2', project_name='my_project2')

DataFrame

PyODPS提供了DataFrame API,它提供了类似pandas的接口,但是能充分利用MaxCompute的计算能力。完整的DataFrame文档请参见DataFrame

DataFrame的示例如下:

注意

在所有步骤开始前,需要创建MaxCompute对象。

  1. >>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
  2. project='**your-project**', endpoint='**your-end-point**'))

此处以movielens 100K作为示例,假设已经有三张表,分别是pyodps_ml_100k_movies(电影相关的数据),pyodps_ml_100k_users(用户相关的数据),pyodps_ml_100k_ratings(评分有关的数据)。

只需传入Table对象,便可创建一个DataFrame对象。如下所示:

  1. >>> from odps.df import DataFrame
  1. >>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))

通过dtypes属性来查看这个DataFrame有哪些字段,分别是什么类型,如下所示:

  1. >>> users.dtypes

通过head方法,可以获取前N条数据,方便快速预览数据。如下所示:

  1. >>> users.head(10)
  user_id age sex occupation zip_code
0 1 24 M technician 85711
1 2 53 F other 94043
2 3 23 M writer 32067
3 4 24 M technician 43537
4 5 33 F other 15213
5 6 42 M executive 98101
6 7 57 M administrator 91344
7 8 36 M administrator 05201
8 9 29 M student 01002
9 10 53 M lawyer 90703

有时候,并不需要都看到所有字段,便可以从中筛选出一部分。如下所示:

  1. >>> users[['user_id', 'age']].head(5)
  user_id age
0 1 24
1 2 53
2 3 23
3 4 24
4 5 33

有时候只是排除个别字段。如下所示:

  1. >>> users.exclude('zip_code', 'age').head(5)
  user_id sex occupation
0 1 M technician
1 2 F other
2 3 M writer
3 4 M technician
4 5 F other

排除掉一些字段的同时,想要通过计算得到一些新的列,比如将sex为M的置为True,否则为False,并取名叫sex_bool。如下所示:

  1. >>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
  user_id age occupation sex_bool
0 1 24 technician True
1 2 53 other False
2 3 23 writer True
3 4 24 technician True
4 5 33 other False

若想知道年龄在20到25岁之间的人有多少个,如下所示:

  1. >>> users.age.between(20, 25).count().rename('count')
  2. 943

若想知道男女用户分别有多少,如下所示:

  1. >>> users.groupby(users.sex).count()
  sex count
0 F 273
1 M 670

若想将用户按职业划分,从高到底,获取人数最多的前10个职业,如下所示:

  1. >>> df = users.groupby('occupation').agg(count=users['occupation'].count())
  2. >>> df.sort(df['count'], ascending=False)[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

DataFrame API提供了value_counts方法来快速达到同样的目的。如下所示:

  1. >>> users.occupation.value_counts()[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

使用更直观的图来查看这份数据,如下所示:

  1. >>> %matplotlib inline

使用横向的柱状图来可视化,如下所示:

  1. >>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

image

将年龄分成30组,查看各年龄分布的直方图,如下所示:

  1. >>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

image

使用join把这三张表进行联合后,把它保存成一张新的表。如下所示:

  1. >>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
  2. >>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
  3. >>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
  4. >>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
  5. >>> lens.dtypes
  1. odps.Schema {
  2. movie_id int64
  3. title string
  4. release_date string
  5. video_release_date string
  6. imdb_url string
  7. user_id int64
  8. rating int64
  9. unix_timestamp int64
  10. age int64
  11. sex string
  12. occupation string
  13. zip_code string
  14. }

把0到80岁的年龄,分成8个年龄段,如下所示:

  1. >>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
  2. >>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')]

取分组和年龄唯一的前10条数据来进行查看,如下所示:

  1. >>> cut_lens['年龄分组', 'age'].distinct()[:10]
  年龄分组 age
0 0-9 7
1 10-19 10
2 10-19 11
3 10-19 13
4 10-19 14
5 10-19 15
6 10-19 16
7 10-19 17
8 10-19 18
9 10-19 19

对各个年龄分组下,用户的评分总数和评分均值进行查看,如下所示:

  1. >>> cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))
  年龄分组 评分均值 评分总数
0 0-9 3.767442 43
1 10-19 3.486126 8181
2 20-29 3.467333 39535
3 30-39 3.554444 25696
4 40-49 3.591772 15021
5 50-59 3.635800 8704
6 60-69 3.648875 2623
7 70-79 3.649746 197

Configuration

配置选项

PyODPS提供了一系列的配置选项,可通过odps.options获得。可配置的MaxCompute选项,如下所示:

通用配置

选项 说明 默认值
end_point MaxCompute Endpoint None
default_project 默认Project None
log_view_host LogView主机名 None
log_view_hours LogView保持时间(小时) 24
local_timezone 使用的时区,True表示本地时间,False表示UTC,也可用pytz 的时区 1
lifecycle 所有表生命周期 None
temp_lifecycle 临时表生命周期 1
biz_id 用户ID None
verbose 是否打印日志 False
verbose_log 日志接收器 None
chunk_size 写入缓冲区大小 1496
retry_times 请求重试次数 4
pool_connections 缓存在连接池的连接数 10
pool_maxsize 连接池最大容量 10
connect_timeout 连接超时 5
read_timeout 读取超时 120
completion_size 对象补全列举条数限制 10
notebook_repr_widget 使用交互式图表 True
sql.settings ODPS SQL运行全局hints None
sql.use_odps2_extension 启用MaxCompute2.0语言扩展 False

数据上传/下载配置

选项 说明 默认值
tunnel.endpoint Tunnel Endpoint None
tunnel.use_instance_tunnel 使用Instance Tunnel获取执行结果 True
tunnel.limited_instance_tunnel 限制Instance Tunnel获取结果的条数 True
tunnel.string_as_binary 在string类型中使用bytes而非unicode False

DataFrame配置

选项 说明 默认值
interactive 是否在交互式环境 根据检测值
df.analyze 是否启用非ODPS内置函数 True
df.optimize 是否开启DataFrame全部优化 True
df.optimizes.pp 是否开启DataFrame谓词下推优化 True
df.optimizes.cp 是否开启DataFrame列剪裁优化 True
df.optimizes.tunnel 是否开启DataFrame使用tunnel优化执行 True
df.quote ODPS SQL后端是否用``来标记字段和表名 True
df.libraries DataFrame运行使用的第三方库(资源名) None

PyODPS ML配置

选项 说明 默认值
ml.xflow_project 默认Xflow工程名 algo_public
ml.use_model_transfer 是否使用ModelTransfer获取模型PMML True
ml.model_volume 在使用ModelTransfer时使用的Volume名称 pyodps_volume
本文导读目录