要对一行数据使用自定义函数,可以使用apply方法,axis参数必须为1,表示在行上操作。

对一行数据使用自定义函数

apply的自定义函数接收一个参数,为上一步Collection的一行数据,用户可以通过属性、或者偏移取得一个字段的数据。

 iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

reduce为True时,表示返回结果为Sequence,否则返回结果为Collection。 namestypes参数分别指定返回的Sequence或Collection的字段名和类型。 如果类型不指定,将会默认为string类型。

在apply的自定义函数中,reduce为False时,也可以使用yield关键字来返回多行结果。

iris.count()
150

def handle(row):
     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

 iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300

我们也可以在函数上来注释返回的字段和类型,这样就不需要在函数调用时再指定。

 from odps.df import output

 @output(['iris_add', 'iris_sub'], ['float', 'float'])
 def handle(row):
     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

 iris.apply(handle, axis=1).count()
300

也可以使用map-only的map_reduce,和axis=1的apply操作是等价的。

iris.map_reduce(mapper=handle).count()
300

如果想调用ODPS上已经存在的UDTF,则函数指定为函数名即可。

 iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

使用apply对行操作,且reduce为False时,可以使用自定义函数与已有的行结合,用于后续聚合等操作。

 from odps.df import output

 @output(['iris_add', 'iris_sub'], ['float', 'float'])
 def handle(row):
     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

iris[iris.category, iris.apply(handle, axis=1)]

对所有列调用自定义聚合

调用apply方法,当我们不指定axis,或者axis为0的时候,我们可以通过传入一个自定义聚合类来对所有sequence进行聚合操作。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
 iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667
说明 目前,受限于Python UDF,自定义函数无法支持将list/dict类型作为初始输入或最终输出结果。

引用资源

类似于对map方法的resources参数,每个resource可以是ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。

对于axis为1,也就是在行上操作,我们需要写一个函数闭包或者callable的类。 而对于列上的聚合操作,我们只需在 __init__ 函数里读取资源即可。

 words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python

 import pandas as pd
stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))

 @output(['sentence'], ['string'])
 def filter_stops(resources):
    stop_words = set([r[0] for r in resources[0]])
     def h(row):
         return ' '.join(w for w in row[0].split() if w not in stop_words),
     return h

 words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python

可以看到这里的stop_words是存放于本地,但在真正执行时会被上传到ODPS作为资源引用。

使用第三方Python库

使用方法类似map中使用第三方Python库

可以在全局指定使用的库:

 from odps import options
 options.df.libraries = ['six.whl', 'python_dateutil.whl']

或者在立即执行的方法中,局部指定:

 df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])
说明 由于字节码定义的差异,Python 3下使用新语言特性(例如 yield from)时,代码在使用Python 2.7的ODPS Worker上执行时会发生错误。因而建议在Python 3下使用MapReduce API编写生产作业前,先确认相关代码是否能正常 执行。