PyODPS概述

PyODPS是MaxCompute的Python版本的SDK。提供简单方便的Python编程接口,以便您使用Python编写MaxCompute作业、查询MaxCompute表和视图,以及管理MaxCompute资源。PyODPS提供了与ODPS命令行工具类似的功能,例如上传和下载文件、创建表、运行ODPS SQL查询等,同时提供了一些高级功能,如提交MapReduce任务、使用ODPS UDF等。本文为您介绍PyODPS的应用场景、支持的工具,及使用过程中需要关注的注意事项

说明

随着MaxCompute MaxFrame的上线发布,将逐步替换PyODPS DataFrame及Mars接口,同时在算子兼容性以及分布式能力上有明显提升,建议新用户/新作业直接基于MaxFrame进行Python开发工作。

功能介绍

PyODPS应用场景请参见:

支持的工具

PyODPS支持在本地环境、DataWorks、PAI Notebooks中使用。

重要

无论您通过何种工具使用PyODPS,建议您尽量避免将全量数据下载到本地直接运行PyODPS任务,容易占用大量内存造成OOM,建议您将任务提交到MaxCompute进行分布式运行,对比介绍请参见下文的注意事项:请勿下载全量数据到本地并运行PyODPS

  • 本地环境:您可以在本地环境安装并使用PyODPS,操作指导可参见通过本地环境使用PyODPS

  • DataWorks:DataWorks的PyODPS节点已安装好了PyODPS,您可以直接在DataWorks的PyODPS节点上开发PyODPS任务并周期性运行,操作指导请参见通过DataWorks使用PyODPS

  • PAI Notebooks:PAI的Python环境也可安装运行PyODPS,其中PAI的内置镜像均已安装好了PyODPS可直接使用,如PAI-Designer的自定义Python组件,在PAI Notebooks中使用PyODPS的方式与通用的使用方式基本一致,可参考基本操作概述DataFrame概述

注意事项:请勿下载全量数据到本地并运行PyODPS

PyODPS作为一个SDK,本身运行于各种客户端,包括PC、DataWorks(数据开发的PyODPS节点)或PAI Notebooks的运行环境。pyodps环境需要注意的是,PyODPS提供了多种方便拉取数据到本地的操作,如tunnel下载操作、execute操作、to_pandas操作等,因此,很多初始使用PyODPS的用户会试图把数据拉取到本地,处理完成后再上传到 MaxCompute上,很多时候这种方式是十分低效的,拉取数据到本地彻底丧失了MaxCompute的大规模并行能力的优势。

数据处理方式

描述

场景示例

拉取到本地处理(不推荐,易OOM)

例如DataWorks中的PyODPS节点,内置了PyODPS包以及必要的Python环境,是一个资源非常受限的客户端运行容器,并不使用MaxCompute计算资源,有较强的内存限制。

PyODPS提供了to_pandas接口,可以直接将MaxCompute数据转化成Pandas DataFrame数据结构,但这个接口只应该被用于获取小规模数据做本地开发调试使用,而不是用来大规模处理数据,因为使用这个接口会触发下载行为,将位于MaxCompute中的海量数据下载到本地,如果后续操作的都是本地的DataFrame,则丧失了MaxCompute 的大规模并行计算能力,且数据量稍大时,单机内存就很容易产生OOM。

提交到MaxCompute分布式执行(推荐)

推荐您合理利用PyODPS提供的分布式DataFrame功能,将主要的计算提交到MaxCompute分布式执行而不是在PyODPS客户端节点下载处理,这是正确使用PyODPS的关键。

推荐使用PyODPS DataFrame接口来完成数据处理。常见的需求,比如需要对每一行数据处理然后写回表,或者一行数据要拆成多行,都可以通过PyODPS DataFrame中的map或者apply实现,有些甚至只需要一行代码,足够高效与简洁,案例可参见使用自定义函数及Python第三方库

使用这些接口最终都会翻译成SQL到MaxCompute计算集群做分布式计算,并且本地几乎没有任何的内存消耗,相比于单机有很大的性能提升。

以下以一个分词的示例为例,为您对比两种方式的代码区别。

  • 示例场景

    用户需要通过分析每天产生的日志字符串来提取一些信息,有一个只有一列的表,类型是string,通过jieba分词可以将中文语句分词,然后再找到想要的关键词存储到信息表里。

  • 低效处理代码demo

    import jieba
    t = o.get_table('word_split')
    out = []
    with t.open_reader() as reader:
        for r in reader:
            words = list(jieba.cut(r[0]))
            #
            # 处理逻辑,产生出 processed_data
            #
            out.append(processed_data)
    out_t = o.get_table('words')
    with out_t.open_writer() as writer:
        writer.write(out)

    单机处理数据的思维,逐行读取数据,然后逐行处理数据,再逐行写入目标表。整个流程中,下载上传数据消耗了大量的时间,并且在执行脚本的机器上需要很大的内存处理所有的数据,特别是对于使用DataWorks节点的用户来说,很容易因为超过默认分配的内存值而导致OOM运行报错。

  • 高效处理代码demo

    from odps.df import output
    out_table = o.get_table('words')
    df = o.get_table('word_split').to_df()
    
    # 假定需要返回的字段及类型如下
    out_names = ["word", "count"]
    out_types = ["string", "int"]
    
    @output(out_names, out_types)
    def handle(row):
        import jieba
        words = list(jieba.cut(row[0]))
        #
        # 处理逻辑,产生出 processed_data
        #
        yield processed_data
    df.apply(handle, axis=1).persist(out_table.name)

    利用apply实现分布式执行:

    • 复杂逻辑都放在handle这个函数里,这个函数会被自动序列化到服务端作为UDF使用,在服务端调用执行,且因为handle服务端实际执行时也是对每一行进行处理的,所以逻辑上是没有区别的。不同的是,这样写的程序在提交到MaxCompute端执行时,有多台机器同时处理数据,可以节约很多时间。

    • 调用persist接口会将产生的数据直接写到另一张MaxCompute表中,所有的数据产生与消费都在 MaxCompute集群完成,也节约了本地的网络与内存。

    • 在这个例子中也使用到了三方包,MaxCompute是支持自定义函数中使用三方包的(示例中的jieba),所以无需担心代码改动带来的成本,您可以几乎不需要改动主要逻辑就可以享受到MaxCompute的大规模计算能力。

使用限制