本文为您介绍DataFrame支持的聚合操作以及如何实现分组聚合、编写自定义聚合。DataFrame提供对列进行HyperLogLog计数的接口。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
使用describe函数,查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差。
>>> print(iris.describe())
    type  sepal_length  sepal_width  petal_length  petal_width
0  count    150.000000   150.000000    150.000000   150.000000
1   mean      5.843333     3.054000      3.758667     1.198667
2    std      0.828066     0.433594      1.764420     0.763161
3    min      4.300000     2.000000      1.000000     0.100000
4    max      7.900000     4.400000      6.900000     2.500000
使用单列执行聚合操作。
>>> iris.sepallength.max()
7.9
如果要在消除重复后的列上进行聚合,可以先调用unique方法,再调用相应的聚合函数。
>>> iris.name.unique().cat(sep=',')
u'Iris-setosa,Iris-versicolor,Iris-virginica'
如果所有列支持同一种聚合操作,也可以直接在整个DataFrame上执行聚合操作。
>>> iris.exclude('category').mean()
   sepal_length  sepal_width  petal_length  petal_width
1      5.843333     3.054000      3.758667     1.198667
在DataFrame上执行count获取的是DataFrame的总行数。
>>> iris.count()
150
PyODPS支持的聚合操作,如下表所示。
聚合操作 说明
count(或size) 数量。
nunique 不重复值数量。
min 最小值。
max 最大值。
sum 求和。
mean 均值。
median 中位数。
quantile(p) p分位数,仅在整数值下可取得准确值。
var 方差。
std 标准差。
moment n阶中心矩(或n阶矩)。
skew 样本偏度(无偏估计)。
kurtosis 样本峰度(无偏估计)。
cat 按sep做字符串连接操作。
tolist 组合为LIST。
说明 不同于Pandas,对于列上的聚合操作,无论是在ODPS还是Pandas后端,PyODPS DataFrame都会忽略空值。这一逻辑与SQL类似。

分组聚合

DataFrame API提供了groupby执行分组操作,分组后的一个主要操作是通过调用agg或者aggregate方法,执行聚合操作。 最终的结果列中会包含分组的列,以及聚合的列。
>>> iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
              name  sepallength_max  smin
0      Iris-setosa              5.8   4.3
1  Iris-versicolor              7.0   4.9
2   Iris-virginica              7.9   4.9
DataFrame提供了value_counts操作,用于返回按某列分组后,将每个组的个数从大到小进行排列。
  • 使用groupby实现。
    >>> iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
                  name  count
    0   Iris-virginica     50
    1  Iris-versicolor     50
    2      Iris-setosa     50
  • 使用value_counts实现。
    >>> iris['name'].value_counts().head(5)
                  name  count
    0   Iris-virginica     50
    1  Iris-versicolor     50
    2      Iris-setosa     50
对于聚合后的单列操作,您也可以直接取出列名。但此时只能使用聚合函数。
>>> iris.groupby('name').petallength.sum()
   petallength_sum
0             73.2
1            213.0
2            277.6
>>> iris.groupby('name').agg(iris.petallength.notnull().sum())
              name  petallength_sum
0      Iris-setosa               50
1  Iris-versicolor               50
2   Iris-virginica               50

分组时也支持对常量进行分组,但是需要使用Scalar初始化。

>>> from odps.df import Scalar
>>> iris.groupby(Scalar(1)).petallength.sum()
   petallength_sum
0            563.8

编写自定义聚合

对字段调用agg或者aggregate方法调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个Mutable的Object(例如LIST、DICT),buffer大小不应随数据而递增。
  • __call__(buffer, *val):将值聚合到中间buffer
  • merge(buffer, pbuffer):将pbuffer聚合到buffer中。
  • getvalue(buffer):返回最终值。
计算平均值的示例如下。
class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
执行结果如下。
>>> iris.sepalwidth.agg(Agg)
3.0540000000000007
如果最终类型和输入类型发生了变化,则需指定类型。
>>> iris.sepalwidth.agg(Agg, 'float')
自定义聚合也可以用在分组聚合中。
>>> iris.groupby('name').sepalwidth.agg(Agg)
   petallength_aggregation
0                    3.418
1                    2.770
2                    2.974
当对多列调用自定义聚合,可以使用agg方法。
class Agg(object):

    def buffer(self):
        return [0.0, 0.0]

    def __call__(self, buffer, val1, val2):
        buffer[0] += val1
        buffer[1] += val2

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]r[0] / buffer[1]
执行结果如下。
>>> from odps.df import agg
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 对两列调用自定义聚合。
>>> iris.groupby('name').agg(val=to_agg)
              name       val
0      Iris-setosa  0.682781
1  Iris-versicolor  0.466644
2   Iris-virginica  0.451427
如果您需要调用ODPS上已经存在的UDAF,指定函数名即可。
>>> iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 对单列聚合。
>>> to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
>>> iris.groupby('name').agg(to_agg.rename('val'))  # 对多列聚合。
说明 目前,因受限于Python UDF,自定义聚合无法支持将LIST/DICT类型作为初始输入或最终输出结果。

HyperLogLog计数

DataFrame提供了对列进行HyperLogLog计数的接口hll_count,这个接口是个近似的估计接口。当数据量很大时,它可以较快地对数据的唯一个数进行估计。

使用该接口计算海量用户UV时,可以快速得出估计值。
>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
63270
>>> df.a.nunique()
63250
说明 提供的splitter参数会对每个字段进行分隔,再计算唯一数。