DataWorks为您提供PyODPS 3节点,您可以在该节点中直接使用Python代码编写MaxCompute作业,并进行作业的周期性调度。本文为您介绍如何通过DataWorks实现Python任务的配置与调度。
前提条件
已创建PyODPS 3节点,详情请参见创建并管理MaxCompute节点。
背景信息
PyODPS是MaxCompute的Python版本的SDK,提供简单方便的Python编程接口,以便您使用Python编写MaxCompute作业、查询MaxCompute表和视图,以及管理MaxCompute资源,详情请参见PyODPS。在DataWorks中,您可通过PyODPS节点实现Python任务的调度运行,以及与其他作业的集成操作。
注意事项
- 在DataWorks资源组本地运行PyODPS节点代码时,若代码中需要调用第三方包,Serverless资源组可通过自定义镜像安装第三方包。 说明- 如果代码中存在UDF引用第三方包的情况,不支持使用上述方式,具体配置方法,请参见UDF示例:Python UDF使用第三方包。 
- 如需升级PyODPS版本,Serverless资源组可通过自定义镜像执行 - /home/tops/bin/pip3 install pyodps==0.12.1进行升级(可以将- 0.12.1替换为您要升级的PyODPS版本),独享调度资源组则通过运维助手执行相同命令进行升级。
- 如果您的PyODPS任务需要访问特殊的网络环境(如VPC网络或IDC网络中的数据源或服务等),请使用Serverless调度资源组,并参考网络连通解决方案打通Serverless资源组与目标环境的网络连通。 
- PyODPS语法及更多信息请参见PyODPS文档。 
- PyODPS节点分为PyODPS 2和PyODPS 3两种,二者的区别在于底层Python语言版本不同。PyODPS 2底层Python语言版本为Python 2,PyODPS 3底层Python语言版本为Python 3,请您根据实际使用的Python语言版本创建PyODPS节点。 
- 若通过PyODPS节点执行SQL无法正常产生数据血缘关系,即数据血缘在数据地图无法正常展示,您可在任务代码处通过手动设置DataWorks调度运行的相关参数解决。查看数据血缘,详情请参见查看血缘信息;参数设置,详情请参见设置运行参数hints。任务运行时所需参数可参考如下代码获取。 - import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
- Pyodps节点的输出日志最大支持4MB。建议您尽量避免在日志中直接输出大量的数据结果。相反,建议您多输出告警日志和正常进度的日志,以提供更有价值的信息。 
使用限制
- 使用独享调度资源组执行PyODPS节点时,建议在节点内获取到独享资源组本地处理的数据不超过50MB,该操作受限于独享调度资源组的规格,处理的本地数据过多并超出操作系统阈值时可能发生OOM(Got Killed)错误。请避免在PyODPS节点中写入过多的数据处理代码。详情请参见高效使用PyODPS最佳实践。 
- 使用Serverless资源组执行PYODPS节点时,您可根据节点内需要处理的数据量合理配置PyODPS节点的CU。 说明- 使用Serverless资源组运行该任务时,单任务支持最大配置 - 64CU,但建议不超过- 16CU,以避免CU过大导致资源不足,影响任务启动。
- 如果您发现有Got killed报错,即表明内存使用超限,进程被中止。因此,请尽量避免本地的数据操作。通过PyODPS发起的SQL和DataFrame任务(除to_pandas外)不受此限制。 
- 非自定义函数代码可以使用平台预装的Numpy和Pandas。不支持其他带有二进制代码的三方包。 
- 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默认设置为False。如果需要全局开启instance tunnel,需要手动将该值设置为True。 
- 当Python 3的子版本号不同(例如Python 3.8和Python 3.7)时,字节码的定义有所不同。 - 目前MaxCompute使用的Python 3版本为3.7,当使用其它版本Python 3中的部分语法(例如Python 3.8中的finally block)时,执行会报错,建议您选择Python 3.7。 
- PyODPS 3支持运行在Serverless资源组上。如需购买使用,请参见新增和使用Serverless资源组。 
- 不支持在PyODPS节点内配置多Python任务并发执行。 
编辑代码:简单示例
创建PyODPS节点后,您可以进行代码编辑及运行,更多关于PyODPS语法说明,请参见基本操作概述。
- ODPS入口- DataWorks的PyODPS节点中,将会包含一个全局的变量odps或o,即ODPS入口,您无需手动定义ODPS入口。 - print(odps.exist_table('PyODPS_iris'))
- 执行SQL- 您可以在PyODPS节点中执行SQL,详情请参见SQL。 - DataWorks上默认未开启instance tunnel,即instance.open_reader默认使用Result接口(最多一万条记录)。您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要关闭 - limit限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭- limit限制。- options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 关闭limit限制,读取全部数据。 with instance.open_reader() as reader: # 通过Instance Tunnel可读取全部数据。
- 您也可以通过在上添加 - tunnel=True,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加- limit=False,实现仅对本次关闭- limit限制。- # 本次open_reader使用Instance Tunnel接口,且能读取全部数据。 with instance.open_reader(tunnel=True, limit=False) as reader:
 说明- 若您未开启Instance Tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK常见问题。 
- 设置运行参数- 您可以通过设置hints参数,来设置运行时的参数,参数类型是dict。 Hints参数的详情请参见SET操作。 - o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
- 对全局配置设置sql.settings后,每次运行时,都需要添加相关的运行时的参数。 - from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根据全局配置添加hints。
 
- 读取运行结果- 运行SQL的实例能够直接执行open_reader的操作,有以下两种情况: - SQL返回了结构化的数据。 - with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 处理每一个record。
- 可能执行的是desc等SQL语句,通过reader.raw属性,获取到原始的SQL执行结果。 - with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)说明- 如果使用了自定义调度参数,页面上直接触发运行PyODPS 3节点时,需要写死时间,PyODPS节点无法直接替换。 
 
- DataFrame- 您还可以通过DataFrame的方式处理数据。 - 执行 - 在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法。 - from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 调用立即执行的方法,处理每条Record。- 如果您需要在Print时调用立即执行,需要开启 - options.interactive。- from odps import options from odps.df import DataFrame options.interactive = True # 在开始处打开开关。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print时会立即执行。
- 打印详细信息 - 通过设置 - options.verbose选项。在DataWorks上,默认已经处于打开状态,运行过程会打印Logview等详细过程。
 
示例
以下以一个简单示例为您介绍PyODPS节点的使用:
- 准备数据集,创建pyodps_iris示例表,具体操作请参见Dataframe数据处理。 
- 创建DataFrame,详情请参见从MaxCompute表创建DataFrame。 
- 在PyODPS节点中输入以下代码并运行。 - from odps.df import DataFrame # 从ODPS表创建DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))- 返回结果: - sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
编辑代码:进阶示例
若节点需要周期性调度,您需要定义节点调度时的相关属性,调度配置详情请参见任务调度属性配置概述。
使用调度参数
单击节点编辑区域右侧的调度配置,在参数区域配置自定义参数,PyODPS节点与SQL节点定义变量的方式不同,详情请参见调度参数配置。
与DataWorks中的SQL节点不同,为了避免影响代码,PyODPS节点不会在代码中替换类似 ${param_name}的字符串,而是在执行代码前,在全局变量中增加一个名为args的dict,调度参数可以在此获取。例如,在参数中设置ds=${yyyymmdd},则可以通过以下方式在代码中获取该参数。
print('ds=' + args['ds'])
ds=20161116如果您需要获取名为ds的分区,则可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])更多场景的PyODPS任务开发,请参考:
后续步骤
- 如何判断Shell自定义脚本任务的成功完成:Python自定义脚本任务的成功完成的判断逻辑与Shell节点一致,您可通过该方法进行判断。 
- 发布任务:如果您使用的是标准模式的工作空间,需要通过任务发布流程,将任务发布至生产环境后,任务才会周期调度运行。 
- 周期任务运维:任务提交发布至生产运维中心调度后,您可通过DataWorks运维中心进行相关运维操作。 
- PyODPS常见问题:您可了解PyODPS执行过程中的常见问题,便于出现异常时快速排查解决。 
常见问题
Q:使用PyODPS3节点通过代码采集第三方接口数据(例如飞书)导入到DataWorks中,在本地开发没有问题,可以采集到数据,但提交到生产环境,在运维中心执行时却报错响应超时,为什么?
A:请在中的安全设置区域,配置沙箱白名单,添加相应第三方接口的白名单信息,允许PyOSPS3中的任务可以访问到目标第三方接口。例如:
