本文为您介绍如何使用自定义函数及Python第三方库。
使用自定义函数
DataFrame函数支持对Sequence使用map
,它会对它的每个元素调用自定义函数。
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0
目前,自定义函数无法支持将List/Dict类型作为输入或输出。
如果map
前后,Sequence的类型发生了变化,则需要显式指定map
后的类型。
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0
如果在函数中包含闭包,则函数外闭包变量值的变化会引起函数内该变量值的变化。
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))
结果为dfs
中每个SequenceExpr
均为df.sepal_length+9
。为解决此问题,可以将函数作为另一函数的返回值,或者使用partial
。两个示例如下。
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
map
也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者Function对象,详情请参见函数。
map
传入Python函数的实现使用了MaxCompute Python UDF。因此,如果您所在的Project不支持Python UDF,则map
函数无法使用。除此以外,所有Python UDF的限制在此都适用。
目前,默认可使用的第三方库(包含C)只有NumPy,第三方库使用详情请参见使用第三方Python库。
除了调用自定义函数,DataFrame还提供了很多内置函数,这些函数中部分使用了map
函数来实现。因此,如果您所在Project未开通Python UDF,则无法使用这些函数(注意:阿里云公共服务暂不提供对Python UDF的支持)。
由于字节码定义的差异,Python 3下使用新语言特性(例如yield from
)时,代码在使用Python 2.7的MaxCompute Worker上执行时会发生错误。因此,建议您在Python 3下使用MapReduce API编写生产作业前,先确认相关代码是否能正常执行。
示例程序:使用计数器
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)
Logview的JSONSummary中即可找到计数器值。
对一行数据使用自定义函数
如果您需要对一行数据使用自定义函数,可以使用apply
方法。参数axis
的值必须设为1,表示对行进行操作。apply
的自定义函数接收一个参数,参数为上一步Collection的一行数据。您可以通过属性或者偏移获得一个字段的数据。
reduce
为True时,表示返回结果为Sequence,否则返回结果为Collection。names
和types
参数分别指定返回的Sequence或Collection的字段名和类型。 如果未指定类型,则会默认为STRING类型。>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9
在
apply
的自定义函数中,reduce
为False时,您可以使用yield
关键字返回多行结果。>>> iris.count() 150 >>> >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300
您也可以在函数中注释返回的字段和类型,无需在函数调用时再指定它们。
>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() 300
您也可以使用
map-only
的map_reduce
,该操作与axis=1
的apply
操作是等价的。>>> iris.map_reduce(mapper=handle).count() 300
如果您想调用MaxCompute上已经存在的UDTF,函数指定为函数名即可。
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
使用
apply
对行操作,且reduce
为False时,您可以使用并列多行输出与已有的行结合,用于后续聚合等操作。>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris[iris.category, iris.apply(handle, axis=1)]
对所有列调用自定义聚合
调用apply
方法,当不指定axis
,或者axis
值为0时,您可以通过传入一个自定义聚合类对所有Sequence进行聚合操作。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
目前,自定义函数无法支持将LIST/DICT类型作为初始输入或最终输出结果。
引用资源
自定义函数也能读取MaxCompute上的资源(表资源或文件资源),或者引用一个Collection作为资源。此时,自定义函数需要写成函数闭包或Callable的类。两个示例如下。
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources按调用顺序传入。
>>> names = set()
>>> fileobj = resources[0] # 文件资源是一个file-like的object。
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # 这里可以通过字段名或者偏移来取。
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
分区表资源在读取时不包含分区字段。
当axis
值为1,即在行上操作时,您需要写一个函数闭包或者Callable的类。 而对于列上的聚合操作,您只需在 __init__
函数里读取资源即可。
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
这里的stop_words
存放于本地,但在真正执行时会被上传到MaxCompute作为资源引用。
使用第三方Python库
您可以把第三方Python包作为资源上传到MaxCompute,支持的格式有whl、egg、zip以及tar.gz。在全局或者在立即执行的方法时,指定需要使用的包文件,即可在自定义函数中使用第三方库。第三方库的依赖库,也必须指定,否则依然会导致导入错误。
您可以通过PyODPS的资源上传接口create_resource来完成资源的上传。
下面以python-dateutil包进行举例:
使用
pip download
命令,下载包以及其依赖到某个路径。下载后会出现两个包:six-1.10.0-py2.py3-none-any.whl和python_dateutil-2.5.3-py2.py3-none-any.whl(注意:这里需要下载支持Linux环境的包)。$ pip download python-dateutil -d /to/path/
分别把两个文件上传到MaxCompute资源。
>>> # 这里要确保资源名的后缀是正确的文件类型。 >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
现在有个DataFrame,只有一个STRING类型字段。
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
全局配置使用到的三方库如下。
>>> from odps import options >>> >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015
或者通过运行方法的
libraries
参数指定使用到的第三方库。>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
PyODPS默认支持执行仅包含Python且不含文件操作的第三方库。在较新版本的MaxCompute服务下,PyODPS也支持执行带有二进制代码或带有文件操作的Python库。这些库的后缀必须是cp27-cp27m-manylinux1_x86_64,以
archive
方式上传,whl后缀的包需要重命名为zip。同时,作业需要开启odps.isolation.session.enable
选项,或者在Project级别开启isolation
。以下示例为您展示如何上传并使用scipy
中的特殊函数。>>> # 对于含有二进制代码的包,必须使用archive方式上传资源,whl后缀需要改为zip。 >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> >>> # 如果Project开启了isolation,下面的选项不是必需的。 >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> >>> def psi(value): >>> # 建议在函数内部import第三方库,以防止不同操作系统下二进制包结构差异造成执行错误。 >>> from scipy.special import psi >>> return float(psi(value)) >>> >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])
对于只提供源码的二进制包,可以在Linux Shell中打包成Wheel再上传,Mac和Windows中生成的Wheel包无法在MaxCompute中使用。
python setup.py bdist_wheel
您也可以通过MaxCompute Console上传资源。
现在主流的Python包都提供了whl包,提供了各平台包含二进制文件的包,因此找到可以在MaxCompute上运行的包是第一步。
其次,要想在MaxCompute上运行,需要包含所有的依赖包,这个是比较繁琐的。各个包的依赖情况如下表所示。
包名
依赖
pandas
numpy、python-dateutil、pytz、six
scipy
numpy
scikit-learn
numpy、scipy
说明其中numpy已包含,您只需上传python-dateutil、pytz、pandas、scipy、sklearn、six包,pandas、scipy和scikit-learn即可使用。
您可进入python-dateutil找到python-dateutil-2.6.0.zip进行下载。
重命名为python-dateutil.zip,通过MaxCompute Console上传资源。
add archive python-dateutil.zip;
说明pytz和six的上传方式同上,分别找到 pytz-2017.2.zip和six-1.11.0.tar.gz进行下载和上传资源操作。
对于Pandas这种包含c的包,需要找到名字中包含cp27-cp27m-manylinux1_x86_64的whl包,这样才能在MaxCompute上正确执行。因此,您需要找到pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl进行下载,然后把后缀改成zip,在MaxCompute Console中执行
add archive pandas.zip;
进行上传。scipy和scikit-learn包的操作同上。
所有包需要下载的资源如下表所示。
包名
文件名
上传资源名
python-dateutil
python-dateutil.zip
pytz
pytz.zip
six
six.tar.gz
pandas
pandas.zip
scipy
scipy.zip
scikit-learn
sklearn.zip
指定第三方Python库
在全局指定使用的库。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
在立即执行的方法中,局部指定使用的库。
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])