本文为您介绍DataFrame支持的聚合操作,以及如何实现分组聚合和编写自定义聚合。DataFrame提供对列进行HyperLogLog计数的接口。
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
常用聚合操作如下:
使用
describe
函数,查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差。print(iris.describe())
返回结果如下。
type sepal_length sepal_width petal_length petal_width 0 count 150.000000 150.000000 150.000000 150.000000 1 mean 5.843333 3.054000 3.758667 1.198667 2 std 0.828066 0.433594 1.764420 0.763161 3 min 4.300000 2.000000 1.000000 0.100000 4 max 7.900000 4.400000 6.900000 2.500000
使用单列执行聚合操作。
iris.sepallength.max()
返回结果如下。
7.9
如果在消除重复后的列上进行聚合,可以先调用
unique
函数,再调用相应的聚合函数。iris.name.unique().cat(sep=',')
返回结果如下。
u'Iris-setosa,Iris-versicolor,Iris-virginica'
如果所有列支持同一种聚合操作,可以直接在整个DataFrame上执行聚合操作。
iris.exclude('category').mean()
返回结果如下。
sepal_length sepal_width petal_length petal_width 1 5.843333 3.054000 3.758667 1.198667
使用
count
函数获取DataFrame的总行数。iris.count()
返回结果如下。
150
说明如果需要打印对应数据到日志中,请执行
print(iris.count().execute())
。
PyODPS支持的聚合操作,如下表所示。
聚合操作 | 说明 |
count(或size) | 数量。 |
unique | 不重复值数量。 |
min | 最小值。 |
max | 最大值。 |
sum | 求和。 |
mean | 均值。 |
median | 中位数。 |
quantile(p) | p分位数,仅在整数值下可取得准确值。 |
var | 方差。 |
std | 标准差。 |
moment | n阶中心矩(或n阶矩)。 |
skew | 样本偏度(无偏估计)。 |
kurtosis | 样本峰度(无偏估计)。 |
cat | 按sep做字符串连接操作。 |
tolist | 组合为LIST。 |
不同于Pandas,对于列上的聚合操作,无论是在MaxCompute还是Pandas后端,PyODPS DataFrame都会忽略空值。这一逻辑与SQL类似。
分组聚合
分组聚合操作如下:
DataFrame提供了
groupby
函数执行分组操作,分组后通过调用agg
或者aggregate
方法,执行聚合操作。最终的结果列中会包含分组的列和聚合的列。iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
返回结果如下。
name sepallength_max smin 0 Iris-setosa 5.8 4.3 1 Iris-versicolor 7.0 4.9 2 Iris-virginica 7.9 4.9
DataFrame提供了
value_counts
函数,按某列分组后,将每个组的个数从大到小进行排列。使用
groupby
函数实现。iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
返回结果如下。
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
使用
value_counts
函数实现。iris['name'].value_counts().head(5)
返回结果如下。
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
对于聚合后的单列操作,您也可以直接取出列名。但此时只能使用聚合函数。
iris.groupby('name').petallength.sum()
返回结果如下。
petallength_sum 0 73.2 1 213.0 2 277.6
iris.groupby('name').agg(iris.petallength.notnull().sum())
返回结果如下。
name petallength_sum 0 Iris-setosa 50 1 Iris-versicolor 50 2 Iris-virginica 50
分组时也支持对常量进行分组,但是需要使用Scalar初始化。
from odps.df import Scalar iris.groupby(Scalar(1)).petallength.sum()
返回结果如下。
petallength_sum 0 563.8
编写自定义聚合
对字段使用agg
或者aggregate
方法调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:
buffer()
:返回一个Mutable的Object(例如LIST或DICT),buffer
大小不应随数据量增大而递增。__call__(buffer, *val)
:将值聚合到中间buffer
。merge(buffer, pbuffer)
:将pbuffer
聚合到buffer
中。getvalue(buffer)
:返回最终值。
计算平均值的示例如下。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
iris.sepalwidth.agg(Agg)
返回结果如下。
3.0540000000000007
编写自定义聚合还需要关注如下内容:
如果最终类型和输入类型发生了变化,则需要指定类型。
iris.sepalwidth.agg(Agg, 'float')
自定义聚合也可以用在分组聚合中。
iris.groupby('name').sepalwidth.agg(Agg)
返回结果如下。
petallength_aggregation 0 3.418 1 2.770 2 2.974
对多列可以使用
agg
方法调用自定义聚合。class Agg(object): def buffer(self): return [0.0, 0.0] def __call__(self, buffer, val1, val2): buffer[0] += val1 buffer[1] += val2 def merge(self, buffer, pbuffer): buffer[0] += pbuffer[0] buffer[1] += pbuffer[1] def getvalue(self, buffer): if buffer[1] == 0: return 0.0 return buffer[0] / buffer[1]
from odps.df import agg to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float') # 对两列调用自定义聚合。 iris.groupby('name').agg(val=to_agg)
返回结果如下。
name val 0 Iris-setosa 0.682781 1 Iris-versicolor 0.466644 2 Iris-virginica 0.451427
如果您需要调用MaxCompute上已经存在的UDAF,指定函数名即可。
iris.groupby('name').agg(iris.sepalwidth.agg('your_func')) # 对单列聚合。 to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float') iris.groupby('name').agg(to_agg.rename('val')) # 对多列聚合。
说明目前,因受限于Python UDF,自定义聚合无法支持将LIST或DICT类型作为初始输入或最终输出结果。
HyperLogLog计数
DataFrame提供了对列进行HyperLogLog计数的接口hll_count
,这个接口是近似个数的估计接口。当数据量很大时,它可以较快地估计去重后的数据量。
使用该接口计算海量用户UV时,可以快速得出估计值。
以下示例使用了Pandas包,您可以在本地环境中运行以下示例,如果在DataWorks环境中运行,您需要先通过三方包的方式导入Pandas包。
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()
返回结果如下。
63270
df.a.nunique()
返回结果如下。
63250
splitter
参数会对每个字段进行分隔,再计算去重后的数据量。