本文为您介绍如何在PyODPS中使用第三方包。PyODPS制作第三方包的操作请参见PyODPS制作第三方包。
前提条件
已开通MaxCompute产品。如何开通请参见开通MaxCompute。
已开通DataWorks产品。如何开通请参见开通DataWorks。
上传三方包
使用三方包前,请确保您生成的包已被上传至MaxCompute Archive资源。上传方式如下:
使用代码上传资源。您需要将
packages.tar.gz
替换成目标包所在的路径和文件名:import os from odps import ODPS # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) o.create_resource("test_packed.tar.gz", "archive", fileobj=open("packages.tar.gz", "rb"))
使用DataWorks上传资源。具体操作请参见步骤一:创建或上传资源。
在Python UDF中使用三方包
您需要对UDF进行修改以使用上传的三方包。具体如下:
在UDF类的
_init_
方法中添加对三方包的引用。在UDF代码(例如evaluate或process方法)中调用三方包。
示例
下面以实现scipy中的psi函数为例,为您介绍如何在Python UDF中使用三方包。
使用以下命令打包scipy。
pyodps-pack -o scipy-bundle.tar.gz scipy
编写以下代码,并将其保存为
test_psi_udf.py
。import sys from odps.udf import annotate @annotate("double->double") class MyPsi(object): def __init__(self): # 将路径增加到引用路径 sys.path.insert(0, "work/scipy-bundle.tar.gz/packages") def evaluate(self, arg0): # 将 import 语句保持在 evaluate 函数内部 from scipy.special import psi return float(psi(arg0))
代码解释:
__init__
函数中将work/scipy-bundle.tar.gz/packages
添加至sys.path
,因为MaxCompute会将所有UDF引用的Archive资源以资源名称为目录解压至work
目录下,而packages
则是pyodps-pack
生成包的子目录。而将对scipy的import放在evaluate函数体内部的原因是三方包仅在执行时可用,当UDF在MaxCompute服务端被解析时,解析环境不包含三方包,函数体外的三方包import会导致报错。将
test_psi_udf.py
上传为MaxCompute Python资源,并将scipy-bundle.tar.gz
上传为Archive资源。创建UDF名为
test_psi_udf
,引用上述两个资源文件,并指定类名为test_psi_udf.MyPsi
。步骤3~4中,可以使用PyODPS或者MaxCompute客户端的方式执行。
使用PyODPS执行方法:
import os from odps import ODPS # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) bundle_res = o.create_resource( "scipy-bundle.tar.gz", "archive", fileobj=open("scipy-bundle.tar.gz", "rb") ) udf_res = o.create_resource( "test_psi_udf.py", "py", fileobj=open("test_psi_udf.py", "rb") ) o.create_function( "test_psi_udf", class_type="test_psi_udf.MyPsi", resources=[bundle_res, udf_res] )
使用MaxCompute客户端执行方法:
add archive scipy-bundle.tar.gz; add py test_psi_udf.py; create function test_psi_udf as test_psi_udf.MyPsi using test_psi_udf.py,scipy-bundle.tar.gz;
完成以上操作后,即可使用UDF执行SQL。
set odps.pypy.enabled=false; set odps.isolation.session.enable=true; select test_psi_udf(sepal_length) from iris;
在PyODPS DataFrame中使用三方包
PyODPS DataFrame支持在execute或persist时使用libraries
参数使用上面的第三方库。 下面以map方法为例,apply或map_reduce方法的过程类似。
使用以下命令打包scipy。
pyodps-pack -o scipy-bundle.tar.gz scipy
假定表名为
test_float_col
,内容只包含一列FLOAT值:col1 0 3.75 1 2.51
计算
psi(col1)
的值,代码如下:import os from odps import ODPS, options def my_psi(v): from scipy.special import psi return float(psi(v)) # 如果 Project 开启了 Isolation,下面的选项不是必需的 options.sql.settings = {"odps.isolation.session.enable": True} # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) df = o.get_table("test_float_col").to_df() # 直接执行并取得结果 df.col1.map(my_psi).execute(libraries=["scipy-bundle.tar.gz"]) # 保存到另一张表 df.col1.map(my_psi).persist("result_table", libraries=["scipy-bundle.tar.gz"])
(可选)如果希望在整个执行过程中使用相同的三方包,可以设置全局选项:
from odps import options options.df.libraries = ["scipy-bundle.tar.gz"]
完成以上操作后,即可在DataFrame执行时使用相关的三方包。
在DataWorks中使用三方包
DataWorks PyODPS节点预置了若干三方包,同时提供了load_resource_package
方法用以引用其他的包,具体使用方式请参见使用三方包。
手动上传和使用三方包
以下内容仅作为维护旧项目或者旧环境的参考,新项目建议直接使用pyodps-pack
打包。
部分旧项目可能采用了之前的方式使用三方包,即手动上传所有依赖的Wheel包并在代码中引用,或者使用了不支持二进制包的旧版MaxCompute环境,本节内容为该场景准备。下面以在map中使用python_dateutil为例为您介绍使用三方包的步骤。
在Linux Bash中使用
pip download
命令,下载包及其依赖到某个路径。下载后会出现两个包,即six-1.10.0-py2.py3-none-any.whl
和python_dateutil-2.5.3-py2.py3-none-any.whl
。pip download python-dateutil -d /to/path/
说明您需要下载支持Linux环境的包,建议直接在Linux下调用该命令。
将上述已下载的两个包分别上传至ODPS资源。
方式一:通过代码上传。
# 这里要确保资源名的后缀是正确的文件类型 odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
方式二:通过DataWorks界面上传。
您可以参考步骤一:创建或上传资源完成目标资源的上传与提交。
使用三方包。
假定DataFrame只有一个STRING类型的字段,内容如下。
datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
全局配置使用到的三方库如下:
from odps import options def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') options.df.libraries = ['six.whl', 'python_dateutil.whl'] df.datestr.map(get_year).execute()
datestr 0 2016 1 2015
通过立即运行方法的
libraries
参数指定:def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
datestr 0 2016 1 2015
PyODPS默认支持执行纯Python且不含文件操作的第三方库。在较新版本的MaxCompute服务下,PyODPS也支持执行带有二进制代码或带有文件操作的Python库。这些库名必须拥有一定的后缀,可根据下表判断:
平台 | Python版本 | 可用的后缀 |
RHEL 5 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64 |
RHEL 5 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64 |
RHEL 7 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64, cp27-cp27m-manylinux2010_x86_64, cp27-cp27m-manylinux2014_x86_64 |
RHEL 7 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64, cp37-cp37m-manylinux2010_x86_64, cp37-cp37m-manylinux2014_x86_64 |
RHEL 7 Arm64 | Python 3.7 | cp37-cp37m-manylinux2014_aarch64 |
所有的Wheel包都需要以Archive格式上传,whl后缀的包需要重命名为zip后缀。同时,作业需要开启odps.isolation.session.enable
选项,或者在Project级别开启Isolation。以下示例展示了如何上传并使用scipy中的特殊函数:
# 对于含有二进制代码的包,必须使用 Archive 方式上传资源,whl 后缀需要改为 zip
odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
# 如果 Project 开启了 Isolation,下面的选项不是必需的
options.sql.settings = { 'odps.isolation.session.enable': True }
def my_psi(value):
# 建议在函数内部 import 第三方库,以防止不同操作系统下二进制包结构差异造成执行错误
from scipy.special import psi
return float(psi(value))
df.float_col.map(my_psi).execute(libraries=['scipy.zip'])
对于只提供源码的二进制包,可以在Linux Shell中打包成Wheel再上传,Mac和Windows中生成的Wheel包无法在MaxCompute中使用,Linux Shell中打包命令如下:
python setup.py bdist_wheel