PyODPS是MaxCompute的Python版本的SDK。提供简单方便的Python编程接口,以便您使用Python编写MaxCompute作业、查询MaxCompute表和视图,以及管理MaxCompute资源。PyODPS提供了与ODPS命令行工具类似的功能,例如上传和下载文件、创建表、运行ODPS SQL查询等,同时提供了一些高级功能,如提交MapReduce任务、使用ODPS UDF等。本文为您介绍PyODPS的应用场景、支持的工具,及使用过程中需要关注的注意事项
随着MaxCompute MaxFrame的上线发布,将逐步替换PyODPS DataFrame及Mars接口,同时在算子兼容性以及分布式能力上有明显提升,建议新用户/新作业直接基于MaxFrame进行Python开发工作。
功能介绍
PyODPS应用场景请参见:
DataFrame操作:DataFrame快速入门。
读取分区表数据:PyODPS读取分区表数据。
参数传递:PyODPS参数传递。
使用第三方包:PyODPS使用第三方包。
查看一级分区:PyODPS查看一级分区。
条件查询:PyODPS条件查询。
DataFrame Sequence及执行:PyODPS的Sequence及执行操作。
支持的工具
PyODPS支持在本地环境、DataWorks、PAI Notebooks中使用。
无论您通过何种工具使用PyODPS,建议您尽量避免将全量数据下载到本地直接运行PyODPS任务,容易占用大量内存造成OOM,建议您将任务提交到MaxCompute进行分布式运行,对比介绍请参见下文的注意事项:请勿下载全量数据到本地并运行PyODPS。
本地环境:您可以在本地环境安装并使用PyODPS,操作指导可参见通过本地环境使用PyODPS。
DataWorks:DataWorks的PyODPS节点已安装好了PyODPS,您可以直接在DataWorks的PyODPS节点上开发PyODPS任务并周期性运行,操作指导请参见通过DataWorks使用PyODPS。
PAI Notebooks:PAI的Python环境也可安装运行PyODPS,其中PAI的内置镜像均已安装好了PyODPS可直接使用,如PAI-Designer的自定义Python组件,在PAI Notebooks中使用PyODPS的方式与通用的使用方式基本一致,可参考基本操作概述、DataFrame概述。
注意事项:请勿下载全量数据到本地并运行PyODPS
PyODPS作为一个SDK,本身运行于各种客户端,包括PC、DataWorks(数据开发的PyODPS节点)或PAI Notebooks的运行环境。需要注意的是,PyODPS提供了多种方便拉取数据到本地的操作,如tunnel下载操作、execute操作、to_pandas操作等,因此,很多初始使用PyODPS的用户会试图把数据拉取到本地,处理完成后再上传到 MaxCompute上,很多时候这种方式是十分低效的,拉取数据到本地彻底丧失了MaxCompute的大规模并行能力的优势。
数据处理方式 | 描述 | 场景示例 |
拉取到本地处理(不推荐,易OOM) | 例如DataWorks中的PyODPS节点,内置了PyODPS包以及必要的Python环境,是一个资源非常受限的客户端运行容器,并不使用MaxCompute计算资源,有较强的内存限制。 | PyODPS提供了 |
提交到MaxCompute分布式执行(推荐) | 推荐您合理利用PyODPS提供的分布式DataFrame功能,将主要的计算提交到MaxCompute分布式执行而不是在PyODPS客户端节点下载处理,这是正确使用PyODPS的关键。 | 推荐使用PyODPS DataFrame接口来完成数据处理。常见的需求,比如需要对每一行数据处理然后写回表,或者一行数据要拆成多行,都可以通过PyODPS DataFrame中的 使用这些接口最终都会翻译成SQL到MaxCompute计算集群做分布式计算,并且本地几乎没有任何的内存消耗,相比于单机有很大的性能提升。 |
以下以一个分词的示例为例,为您对比两种方式的代码区别。
示例场景
用户需要通过分析每天产生的日志字符串来提取一些信息,有一个只有一列的表,类型是string,通过jieba分词可以将中文语句分词,然后再找到想要的关键词存储到信息表里。
低效处理代码demo
import jieba t = o.get_table('word_split') out = [] with t.open_reader() as reader: for r in reader: words = list(jieba.cut(r[0])) # # 处理逻辑,产生出 processed_data # out.append(processed_data) out_t = o.get_table('words') with out_t.open_writer() as writer: writer.write(out)
单机处理数据的思维,逐行读取数据,然后逐行处理数据,再逐行写入目标表。整个流程中,下载上传数据消耗了大量的时间,并且在执行脚本的机器上需要很大的内存处理所有的数据,特别是对于使用DataWorks节点的用户来说,很容易因为超过默认分配的内存值而导致OOM运行报错。
高效处理代码demo
from odps.df import output out_table = o.get_table('words') df = o.get_table('word_split').to_df() # 假定需要返回的字段及类型如下 out_names = ["word", "count"] out_types = ["string", "int"] @output(out_names, out_types) def handle(row): import jieba words = list(jieba.cut(row[0])) # # 处理逻辑,产生出 processed_data # yield processed_data df.apply(handle, axis=1).persist(out_table.name)
利用apply实现分布式执行:
复杂逻辑都放在handle这个函数里,这个函数会被自动序列化到服务端作为UDF使用,在服务端调用执行,且因为handle服务端实际执行时也是对每一行进行处理的,所以逻辑上是没有区别的。不同的是,这样写的程序在提交到MaxCompute端执行时,有多台机器同时处理数据,可以节约很多时间。
调用persist接口会将产生的数据直接写到另一张MaxCompute表中,所有的数据产生与消费都在 MaxCompute集群完成,也节约了本地的网络与内存。
在这个例子中也使用到了三方包,MaxCompute是支持自定义函数中使用三方包的(示例中的
jieba
),所以无需担心代码改动带来的成本,您可以几乎不需要改动主要逻辑就可以享受到MaxCompute的大规模计算能力。
使用限制
由于沙箱的限制,部分Pandas计算后端执行本地调试通过的程序,无法在MaxCompute上调试通过。