使用介绍
内置UDAF列表
HA3目前内置了几种常见UDAF:
sum:聚合后求和
avg:聚合后求均值
max:聚合后求最大值
min:聚合后求最小值
count:聚合后统计条目数
ARBITRARY:聚合后选择某一个值(一般用于从“值全部相同的字段”中返回数据),其它SQL引擎中可能称为IDENTITY函数
GATHER:将多个单值聚合为一个多值
MULTIGATHER:将多个多值聚合为一个多值
MAXLABEL:聚合后求最大值对应的Label值
使用示例
测试数据
后续演示将使用测试环境的 phone
表进行,表中主要记录了主流品牌的手机信息,表的内容如下:
nid | title | price | brand | size | color |
1 | Huawei Mate 9 麒麟960芯片 徕卡双镜头 | 3599 | Huawei | 5.9 | 红 |
2 | Huawei P10 Plus全网通手机 | 4388 | Huawei | 5.5 | 蓝 |
3 | Xiaomi/小米 红米手机4X 32G全网通4G智能手机 | 899 | 小米 | 5.0 | 黑 |
4 | OPPO R11 全网通前后2000万指纹识别拍照手机r11r9s | 2999 | OPPO | 5.5 | 红 |
5 | Meizu/魅族 魅蓝E2 全网通正面指纹快充4G智能手机 | 1299 | Meizu | 5.5 | 银白 |
6 | Nokia/诺基亚 105移动大声老人机直板按键学生老年小手机超长待机 | 169 | Nokia | 1.4 | 蓝 |
7 | Apple/苹果 iPhone 6s 32G 原封国行现货速发 | 3599 | Apple | 4.7 | 银白 |
8 | Apple/苹果 iPhone 7 Plus 128G 全网通4G手机 | 5998 | Apple | 5.5 | 亮黑 |
9 | Apple/苹果 iPhone 7 32G 全网通4G智能手机 | 4298 | Apple | 4.7 | 黑 |
10 | Samsung/三星 GALAXY S8 SM-G9500 全网通 4G手机 | 5688 | Samsung | 5.6 | 雾屿蓝 |
检索示例
检索全表内容
SELECT * FROM phone ORDER BY nid LIMIT 1000
USE_TIME: 0.036, ROW_COUNT: 10
------------------------------- TABLE INFO ---------------------------
nid | title | price | brand | size | color |
1 | null | 3599 | Huawei | 5.9 | null |
2 | null | 4388 | Huawei | 5.5 | null |
3 | null | 899 | Xiaomi | 5 | null |
4 | null | 2999 | OPPO | 5.5 | null |
5 | null | 1299 | Meizu | 5.5 | null |
6 | null | 169 | Nokia | 1.4 | null |
7 | null | 3599 | Apple | 4.7 | null |
8 | null | 5998 | Apple | 5.5 | null |
9 | null | 4298 | Apple | 4.7 | null |
10 | null | 5688 | Samsung | 5.6 | null |
注:title和color是Summary字段,所以在本阶段查询中显示为null
使用sum函数统计每个品牌商品的总价
SELECT brand, sum(price) FROM phone GROUP BY (brand) ORDER BY brand LIMIT 1000
USE_TIME: 0.152, ROW_COUNT: 7
------------------------------- TABLE INFO ---------------------------
brand | SUM(price) |
Apple | 13895 |
Huawei | 7987 |
Meizu | 1299 |
Nokia | 169 |
OPPO | 2999 |
Samsung | 5688 |
Xiaomi | 899 |
使用max函数统计每个品牌最贵手机的价格,并按照最高价格降序排列
SELECT brand, max(price) AS price FROM phone GROUP BY (brand) ORDER BY price DESC LIMIT 1000
USE_TIME: 0.053, ROW_COUNT: 7
------------------------------- TABLE INFO ---------------------------
brand | price |
Apple | 5998 |
Samsung | 5688 |
Huawei | 4388 |
OPPO | 2999 |
Meizu | 1299 |
Xiaomi | 899 |
Nokia | 169 |
使用MAXLABEL函数统计每个品牌最贵手机的屏幕尺寸
SELECT brand, MAXLABEL(size, price) AS size FROM phone GROUP BY brand
执行机制
实现原理
每个UDAF的实现在单一的类中,该类需要继承自基类AggFunc并实现相关接口。主要接口如下(分为local和global两个部分):
// 以下四个函数供collect阶段调用
virtual bool initCollectInput(const TablePtr &inputTable);
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;
// 以下四个函数供merge阶段使用
virtual bool initMergeInput(const TablePtr &inputTable);
virtual bool initResultOutput(const TablePtr &outputTable);
virtual bool merge(Row inputRow, Accumulator *acc);
virtual bool outputResult(Accumulator *acc, Row outputRow) const;
UDAF在运行时主要分为两个阶段:1.collect阶段 2.merge阶段。这两个阶段分别运行在Searcher和Qrs上:执行引擎先会在Searcher环境下调用上述collect阶段的四个函数,在数据集上完成初步统计;待所有Searcher上的collect阶段执行完毕后,会将中间结果汇总到Qrs上,然后在Qrs环境下调用上述merge阶段的四个函数,对collect阶段的输出进一步加工,并输出最终结果。
下面以内置函数avg为例,结合HA3的运行流程,介绍该UDAF在这两个阶段的实现。
collect阶段
collect阶段会运行在Searcher上,引擎将根据本Searcher上的聚合结果,调用AggFunc的 collect
方法进行初步统计,并将结果保存在相应的Accumulator对象中。其中,每个分组会有自己对应的Accumulator,用于存放该分组统计过程中的状态信息。
// 初始化一些`ColumnData`对象,用于访问输入表中数据
virtual bool initCollectInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`对象,用于将Accumulator序列化到输出表
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
// collect阶段主要过程,将多行输入数据的统计结果存储在对应Accumulator对象上
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
// 将Accumulator序列化到输出表上(方便网络传输),之后会由Searcher汇总到Qrs上
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;
引擎对每行数据调用collect方法前,会先计算出该行数据的GroupKey作为分组依据,取出该分组对应的Accumulator一同传入。用户在收到当前数据后,根据数据内容修改Accumulator的状态。
对Avg函数来讲,collect阶段要做的事情就是记录当前Group下的数据条目数以及数值总和,为未来均值的计算作准备。
template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::collect(Row inputRow, Accumulator *acc) {
AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
avgAcc->count++;
avgAcc->sum += _inputColumn->get(inputRow);
return true;
}
引擎通过访问 AvgAggFuncCreator::createLocalFunction
,获得当前阶段的AggFunc。注意,这一阶段的输出是 AvgAccumulator
上的所有属性。
AggFunc *AvgAggFuncCreator::createLocalFunction(
const vector<ValueType> &inputTypes,
const vector<string> &inputFields,
const vector<string> &outputFields);
merge阶段
本阶段在Qrs上执行,用于将各个Searcher上返回的初步统计结果进行处理,输出最终的结果。主要的处理过程在AggFunc的 merge
函数内
// 初始化一些`ColumnData`对象,用于访问输入表中数据,包括Accumulator数据
virtual bool initMergeInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`对象,用于输出统计结果输出结果
virtual bool initResultOutput(const TablePtr &outputTable);
// merge阶段主要过程,将来自多个Searcher的Accumulator信息整合
virtual bool merge(Row inputRow, Accumulator *acc);
// 计算最终结果并输出
virtual bool outputResult(Accumulator *acc, Row outputRow) const;
例如单次查询中,多个Searcher中都有GroupKey=Apple的Accumulator上报,则在merge阶段引擎会为GroupKey=Apple新生成一个Accumulator,用于将各个Searcher传入的相关Accumulator信息聚合。
可以注意到Avg函数在merge阶段,需要将所有collect阶段的Accumulator.count累加,才能得到该Group在所有Searcher上的数据条目数。这与collect阶段统计count的实现并不相同,这也是为什么要针对两个阶段分别设置处理函数的原因。
template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::merge(Row inputRow, Accumulator *acc) {
AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
avgAcc->count += _countColumn->get(inputRow);
avgAcc->sum += _sumColumn->get(inputRow);
return true;
}
当所有Group的Accumulator通过 merge
函数统计完毕后,引擎会依次将每个Group的Accumulator并传入`outputResult`函数获得最终输出。Avg函数这个阶段只需要执行 avg = sum/count
即可获得该Group的均值
template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::outputResult(Accumulator *acc, Row outputRow) const {
AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
assert(avgAcc->count > 0);
double avg = (double)avgAcc->sum / avgAcc->count;
_avgColumn->set(outputRow, avg);
return true;
}
引擎通过访问 AvgAggFuncCreator::createGlobalFunction
,获得当前阶段的AggFunc。注意,这一阶段的输入是Accumulator字段。
AggFunc *AvgAggFuncCreator::createGlobalFunction(
const vector<ValueType> &inputTypes,
const vector<string> &inputFields,
const vector<string> &outputFields);
特殊情况
为提升性能,当执行引擎发现 GROUP BY
的字段即为Searchers数据分列的依据时(即各个Searcher之间不会有数据的GroupKey相同),会将UDAF的merge阶段也下沉到Searcher上进行。即在单次Query中,每个Searcher会依次执行UDAF的collect阶段、merge两个阶段(但会跳过其中无用的Accumulator序列化和反序列化),Qrs仅仅将各个Searcher返回的聚合统计结果进行简单粘贴,不再执行merge阶段。
此优化可以有效降低某些场景下Qrs的负载,提升对Query的处理性能。该优化对UDAF的编写者是无感知的,无需为此调整UDAF的设计。
- 本页导读 (1)
- 使用介绍
- 内置UDAF列表
- 使用示例
- 执行机制
- 实现原理