PyODPS为MaxCompute的Python版SDK,支持在DataWorks中开发运行PyODPS任务。本文为您介绍在DataWorks上使用PyODPS的使用限制、主要流程和部分简单应用示例。

使用限制

  • 使用方式限制

    如果您发现有Got killed报错,即表明内存使用超限,进程被中止。请避免在PyODPS节点中直接下载数据并在DataWorks中处理数据,建议将数据处理任务提交到MaxCompute进行分布式执行处理,两种方式的对比详情请参见注意事项:请勿下载全量数据到本地并运行PyODPS

  • 包支持限制
    • DataWorks的PyODPS节点缺少matplotlib等包,如下功能可能受限:
      • DataFrame的plot函数。
      • DataFrame自定义函数需要提交到MaxCompute执行。由于Python沙箱限制,第三方库只支持所有的纯粹Python库以及Numpy,因此不能直接使用Pandas。
      • DataWorks中执行的非自定义函数代码可以使用平台预装的Numpy和Pandas。不支持其他带有二进制代码的三方包。
    • DataWorks的PyODPS节点不支持Python的atexit包,请使用try-finally结构实现相关功能。
  • 读取数据记录数限制

    DataWorks的PyODPS节点中,options.tunnel.use_instance_tunnel默认设置为False,即默认情况下,最多读取一万条数据记录。如果需要读取更多数据记录,需全局开启instance tunnel,即需要手动将options.tunnel.use_instance_tunnel设置为True。

主要流程

  1. 创建PyODPS节点。
    您可以进入DataWorks的数据开发页面创建PyODPS节点。PyODPS节点分为PyODPS 2和PyODPS 3两种:
    • PyODPS 2底层Python语言版本为Python 2。
    • PyODPS 3底层Python语言版本为Python 3。
    您可根据实际使用的Python语言版本创建PyODPS节点,创建PyODPS节点的详细操作步骤请参见创建PyODPS 2节点创建PyODPS 3节点新建节点
  2. 开发PyODPS任务代码。
    创建完成后,您可参考下文内容进行简单示例的操作学习,了解PyODPS的主要能力维度。更多PyODPS的使用指导请参见基本操作概述DataFrame概述。您也可以参考示例文档:PyODPS节点实现结巴中文分词,进行一个端到端的简单操作。
  3. 进行调度配置,完成后保存、提交、发布节点,后续即可周期性运行任务。

ODPS入口

DataWorks的PyODPS节点中,将会包含一个全局变量odps或者o,即为ODPS入口。您不需要手动定义ODPS入口。 命令示例如下。
#查看表pyodps_iris是否存在
print(o.exist_table('pyodps_iris'))
返回True,表示表存在。

执行SQL

通用能力

  • 在PyODPS节点中运行SQL命令:例如使用execute_sql()/run_sql()来执行SQL命令,当前主要支持运行DDL、DML类型的SQL命令。
    说明 可以执行的SQL语句并非都可以通过入口对象的execute_sql()run_sql()方法执行。在调用非DDL或非DML语句时,请使用其它方法。例如,调用GRANT或REVOKE语句时,请使用run_security_query方法;调用API命令时,请使用run_xflowexecute_xflow方法。
  • 在PyODPS节点中读取SQL运行结果:例如使用open_reader()来读取SQL命令运行结果。
更多PyODPS节点的SQL相关操作详情请参见SQL

注意事项:数据记录数量限制

DataWorks上默认没有开启instance tunnel,即instance.open_reader默认使用Result接口(存在limit限制,最多读取一万条数据记录)。如果您需要迭代获取全部数据,则需要开启instance tunnel并关闭limit限制。
  • 全局关闭limit限制
    您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。
    options.tunnel.use_instance_tunnel = True
    options.tunnel.limit_instance_tunnel = False  # 关闭limit限制,读取全部数据。
    
    with instance.open_reader() as reader:
    # 通过Instance Tunnel可读取全部数据。
    # 您可以通过reader.count获取记录数。
  • 仅本次运行关闭limit限制
    您也可以通过在open_reader上添加tunnel=True,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False,实现仅对本次关闭limit限制。
    重要 若您未开启Instance Tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK常见问题
    with instance.open_reader(tunnel=True, limit=False) as reader:
    # 本次open_reader使用Instance Tunnel接口,且能读取全部数据。

DataFrame

  • 执行
    在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法(如executepersist等)。示例代码如下。
    # 调用立即执行的方法,处理每条Record,打印出表pyodps_irisiris.sepalwidth小于3的所有数据。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
        print(record)
  • 打印详细信息

    在DataWorks上默认打开options.verbose选项,即默认情况下,DataWorks的PyODPS节点运行过程会打印Logview等详细过程。您可以手动设置此选项,指定运行过程是否会打印Logview等详细过程。

更多DataFrame的操作示例请参见DataFrame概述

获取调度参数

使用DataWorks的PyODPS节点开发任务代码时,您也可以使用调度参数,例如,需要通过调度参数获取任务运行的业务日期等场景。PyODPS节点与DataWorks中的SQL节点在调度参数的定义参数操作方面一致,但是在代码中的引用方式不同。
  • SQL节点会在代码中直接使用 ${param_name}这样的字符串。
  • 为了避免影响代码,PyODPS节点在执行代码前,在全局变量中增加了一个名为args的dict,代码中使用args[param_name]的方式获取调度参数取值,而非在代码中替换 ${param_name}
例如,在节点基本属性 > 参数中设置了调度参数ds=${yyyymmdd},则可以通过以下方式在代码中获取该参数。
  • 获取参数ds的取值。
    print('ds=' + args['ds'])
    #返回ds的时间,如ds=20161116
  • 获取名为ds=${yyyymmdd}的分区的表数据。
    o.get_table('table_name').get_partition('ds=' + args['ds'])
    #获取ds分区下表table_name的数据
更多调度参数详情可参见配置并使用调度参数

设置运行参数hints

运行任务时如果需要设置运行时参数,可以通过设置hints参数来实现,参数类型是dict。
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
您也可以对全局设置sql.setting,设置后后续每次运行时都会添加相关的运行时参数。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris')  #会根据全局配置添加hints

使用三方包

DataWorks节点预装了以下三方包,版本列表如下:
包名Python 2节点版本Python 3节点版本
requests2.11.12.26.0
numpy1.16.61.18.1
pandas0.24.21.0.5
scipy0.19.01.3.0
scikit_learn0.18.10.22.1
pyarrow0.16.02.0.0
lz42.1.43.1.10
zstandard0.14.10.17.0
如果您需要使用上面列表中不存在的包,DataWorks节点提供了load_resource_package方法,支持从MaxCompute资源下载三方包。使用pyodps-pack打包后,可以直接使用load_resource_package方法加载三方包,之后就可以导入包中的内容。关于pyodps-pack的使用方法请参见PyODPS制作第三方包PyODPS使用第三方包
说明 如果为Python 2节点打包,请在打包时为pyodps-pack增加--dwpy27参数。
示例:
  1. 使用以下命令打包ipaddress。
    pyodps-pack -o ipaddress-bundle.tar.gz ipaddress
  2. 上传并提交ipaddress-bundle.tar.gz为资源后,可以在PyODPS 3节点中按照下面的方法使用ipaddress包。
    load_resource_package("ipaddress-bundle.tar.gz")
    import ipaddress
DataWorks限制下载的包总大小为100 MB。如果您需要跳过预装包的打包,可以在打包时使用pyodps-pack提供的--exclude参数。例如:下面打包方法排除了DataWork环境中存在的numpy包和pandas包。
pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <your_package>

附录:示例数据

您可参考PyODPS条件查询文档中的步骤1:创建表并导入数据在DataWorks中建好表pyodps_iris并写入数据,文档中的操作示例即以此表作为示例数据,为您演示基本操作。