PyODPS可在DataWorks等数据开发平台中作为数据开发节点调用。这些平台提供了PyODPS运行环境和调度执行的能力,无需您手动创建ODPS入口对象。
功能介绍
PyODPS应用场景请参见:
- DataFrame操作:DataFrame快速入门。
- 读取分区表数据:PyODPS读取分区表数据。
- 参数传递:PyODPS参数传递。
- 使用第三方包:PyODPS使用第三方包。
- 查看一级分区:PyODPS查看一级分区。
- 条件查询:PyODPS条件查询。
- DataFrame Sequence及执行:PyODPS的Sequence及执行操作。
支持的工具
PyODPS支持在本地环境、DataWorks、PAI Notebooks中使用。
重要 无论您使用何种工具使用PyODPS,建议您尽量避免将全量数据下载到本地直接运行PyODPS任务,容易占用大量内存造成OOM,建议您将任务提交到MaxCompute进行分布式运行,对比介绍请参见下文的注意事项:请勿下载全量数据到本地并运行PyODPS。
- 本地环境:您可以在本地环境安装并使用PyODPS,操作指导可参见通过本地环境使用PyODPS。
- DataWorks:DataWorks的PyODPS节点已安装好了PyODPS,您可以直接在DataWorks的PyODPS节点上开发PyODPS任务并周期性运行,操作指导请参见通过DataWorks使用PyODPS。
- PAI Notebooks:PAI的Python环境也可安装运行PyODPS,其中PAI的内置镜像均已安装好了PyODPS可直接使用,如PAI-Designer的自定义python组件,在PAI Notebooks中使用PyODPS的方式与通用的使用方式基本一致,可参考基本操作概述、DataFrame概述。
注意事项:请勿下载全量数据到本地并运行PyODPS
PyODPS作为一个SDK,本身运行于各种客户端,包括PC、DataWorks(数据开发的PyODPS节点)或PAI Notebooks的运行环境。
需要注意的是,PyODPS提供了多种方便拉取数据到本地的操作,如tunnel下载操作、execute操作、to_pandas操作等,因此,很多初始使用PyODPS的用户会试图把数据拉取到本地,处理完成后再上传到 MaxCompute上,很多时候这种方式是十分低效的,拉取数据到本地彻底丧失了MaxCompute的大规模并行能力的优势。
以下以一个分词的示例为例,为您对比两种方式的代码区别。

数据处理方式 | 描述 | 场景示例 |
---|---|---|
拉取到本地处理(不推荐,易OOM) | 例如DataWorks中的PyODPS节点,内置了PyODPS包以及必要的Python环境,是一个资源非常受限的客户端运行容器,并不使用MaxCompute计算资源,有较强的内存限制。 | PyODPS提供了to_pandas 接口,可以直接将MaxCompute数据转化成Pandas DataFrame数据结构,但这个接口只应该被用于获取小规模数据做本地开发调试使用,而不是用来大规模处理数据,因为使用这个接口会触发下载行为,将位于MaxCompute中的海量数据下载到本地,如果后续操作的都是本地的DataFrame,则丧失了MaxCompute 的大规模并行计算能力,且数据量稍大时,单机内存就很容易产生OOM。 |
提交到MaxCompute分布式执行(推荐) | 推荐您合理利用PyODPS提供的分布式DataFrame功能,将主要的计算提交到MaxCompute分布式执行而不是在PyODPS客户端节点下载处理,这是正确使用PyODPS的关键。 | 推荐使用PyODPS DataFrame接口来完成数据处理。常见的需求,比如需要对每一行数据处理然后写回表,或者一行数据要拆成多行,都可以通过PyODPS DataFrame中的map 或者apply 实现,有些甚至只需要一行代码,足够高效与简洁,案例可参见使用自定义函数及Python第三方库。使用这些接口最终都会翻译成SQL到MaxCompute计算集群做分布式计算,并且本地几乎没有任何的内存消耗,相比于单机有很大的性能提升。 |
- 示例场景
用户需要通过分析每天产生的日志字符串来提取一些信息,有一个只有一列的表,类型是string,通过jieba分词可以将中文语句分词,然后再找到想要的关键词存储到信息表里。
- 低效处理代码demo
单机处理数据的思维,一行一行读出数据,然后一行一行处理数据,然后再一行一行写入目标表。整个流程中,下载上传数据消耗了大量的时间,并且在执行脚本的机器上需要很大的内存处理所有的数据,特别对于使用DataWorks节点的用户,很容易因为超过默认分配的内存值,导致OOM运行报错。import jieba t = o.get_table('word_split') out = [] with t.open_reader() as reader: for r in reader: words = list(jieba.cut(r[0])) # # 处理逻辑,产生出 processed_data # out.append(processed_data) out_t = o.get_table('words') with out_t.open_writer() as writer: writer.write(out)
- 高效处理代码demo
利用apply实现分布式执行:from odps.df import output out_table = o.get_table('words') df = o.get_table('word_split').to_df() # 假定需要返回的字段及类型如下 out_names = ["word", "count"] out_types = ["string", "int"] @output(out_names, out_types) def handle(row): import jieba words = list(jieba.cut(r[0])) # # 处理逻辑,产生出 processed_data # yield processed_data df.apply(handle, axis=1).persist(out_table.name)
- 复杂逻辑都放在handle这个函数里,这个函数会被自动序列化到服务端作为UDF使用,在服务端调用执行,且因为handle服务端实际执行时也是对每一行进行处理的,所以逻辑上是没有区别的。不同的是,这样写的程序在提交到MaxCompute端执行时,有多台机器同时处理数据,可以节约很多时间。
- 调用persist接口会将产生的数据直接写到另一张MaxCompute表中,所有的数据产生与消费都在 MaxCompute集群完成,也节约了本地的网络与内存。
- 在这个例子中也使用到了三方包,MaxCompute是支持自定义函数中使用三方包的(示例中的
jieba
),所以无需担心代码改动带来的成本,您可以几乎不需要改动主要逻辑就可以享受到MaxCompute的大规模计算能力。
使用限制
- SQL使用限制项。
- DataWorks PyODPS节点使用限制。
- 由于沙箱的限制,部分Pandas计算后端执行本地调试通过的程序,无法在MaxCompute上调试通过。