UDAF

更新时间:2025-03-03 07:41:37

使用介绍

内置UDAF列表

HA3目前内置了几种常见UDAF:

  • sum:聚合后求和

  • avg:聚合后求均值

  • max:聚合后求最大值

  • min:聚合后求最小值

  • count:聚合后统计条目数

  • ARBITRARY:聚合后选择某一个值(一般用于从“值全部相同的字段”中返回数据),其它SQL引擎中可能称为IDENTITY函数

  • GATHER:将多个单值聚合为一个多值

  • MULTIGATHER:将多个多值聚合为一个多值

  • MAXLABEL:聚合后求最大值对应的Label

使用示例

测试数据

后续演示将使用测试环境的 phone 表进行,表中主要记录了主流品牌的手机信息,表的内容如下:

nid

title

price

brand

size

color

1

Huawei Mate 9 麒麟960芯片 徕卡双镜头

3599

Huawei

5.9

2

Huawei P10 Plus全网通手机

4388

Huawei

5.5

3

Xiaomi/小米 红米手机4X 32G全网通4G智能手机

899

小米

5.0

4

OPPO R11 全网通前后2000万指纹识别拍照手机r11r9s

2999

OPPO

5.5

5

Meizu/魅族 魅蓝E2 全网通正面指纹快充4G智能手机

1299

Meizu

5.5

银白

6

Nokia/诺基亚 105移动大声老人机直板按键学生老年小手机超长待机

169

Nokia

1.4

7

Apple/苹果 iPhone 6s 32G 原封国行现货速发

3599

Apple

4.7

银白

8

Apple/苹果 iPhone 7 Plus 128G 全网通4G手机

5998

Apple

5.5

亮黑

9

Apple/苹果 iPhone 7 32G 全网通4G智能手机

4298

Apple

4.7

10

Samsung/三星 GALAXY S8 SM-G9500 全网通 4G手机

5688

Samsung

5.6

雾屿蓝

检索示例

  • 检索全表内容

SELECT * FROM phone ORDER BY nid LIMIT 1000 

USE_TIME: 0.036, ROW_COUNT: 10

------------------------------- TABLE INFO ---------------------------
                 nid |               title |               price |               brand |                size |               color |
                   1 |                null |                3599 |              Huawei |                 5.9 |                null |
                   2 |                null |                4388 |              Huawei |                 5.5 |                null |
                   3 |                null |                 899 |              Xiaomi |                   5 |                null |
                   4 |                null |                2999 |                OPPO |                 5.5 |                null |
                   5 |                null |                1299 |               Meizu |                 5.5 |                null |
                   6 |                null |                 169 |               Nokia |                 1.4 |                null |
                   7 |                null |                3599 |               Apple |                 4.7 |                null |
                   8 |                null |                5998 |               Apple |                 5.5 |                null |
                   9 |                null |                4298 |               Apple |                 4.7 |                null |
                  10 |                null |                5688 |             Samsung |                 5.6 |                null |

注:titlecolorSummary字段,所以在本阶段查询中显示为null

  • 使用sum函数统计每个品牌商品的总价

SELECT brand, sum(price) FROM phone GROUP BY (brand) ORDER BY brand LIMIT 1000

USE_TIME: 0.152, ROW_COUNT: 7

------------------------------- TABLE INFO ---------------------------
               brand |          SUM(price) |
               Apple |               13895 |
              Huawei |                7987 |
               Meizu |                1299 |
               Nokia |                 169 |
                OPPO |                2999 |
             Samsung |                5688 |
              Xiaomi |                 899 |

  • 使用max函数统计每个品牌最贵手机的价格,并按照最高价格降序排列

SELECT brand, max(price) AS price FROM phone GROUP BY (brand) ORDER BY price DESC LIMIT 1000

USE_TIME: 0.053, ROW_COUNT: 7

------------------------------- TABLE INFO ---------------------------
               brand |               price |
               Apple |                5998 |
             Samsung |                5688 |
              Huawei |                4388 |
                OPPO |                2999 |
               Meizu |                1299 |
              Xiaomi |                 899 |
               Nokia |                 169 |

  • 使用MAXLABEL函数统计每个品牌最贵手机的屏幕尺寸

SELECT brand, MAXLABEL(size, price) AS size FROM phone GROUP BY brand

执行机制

实现原理

每个UDAF的实现在单一的类中,该类需要继承自基类AggFunc并实现相关接口。主要接口如下(分为localglobal两个部分):

// 以下四个函数供collect阶段调用
virtual bool initCollectInput(const TablePtr &inputTable);
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;

// 以下四个函数供merge阶段使用
virtual bool initMergeInput(const TablePtr &inputTable);
virtual bool initResultOutput(const TablePtr &outputTable);
virtual bool merge(Row inputRow, Accumulator *acc);
virtual bool outputResult(Accumulator *acc, Row outputRow) const;

UDAF在运行时主要分为两个阶段:1.collect阶段 2.merge阶段。这两个阶段分别运行在SearcherQrs上:执行引擎先会在Searcher环境下调用上述collect阶段的四个函数,在数据集上完成初步统计;待所有Searcher上的collect阶段执行完毕后,会将中间结果汇总到Qrs上,然后在Qrs环境下调用上述merge阶段的四个函数,对collect阶段的输出进一步加工,并输出最终结果。

下面以内置函数avg为例,结合HA3的运行流程,介绍该UDAF在这两个阶段的实现。

collect阶段

collect阶段会运行在Searcher上,引擎将根据本Searcher上的聚合结果,调用AggFunccollect 方法进行初步统计,并将结果保存在相应的Accumulator对象中。其中,每个分组会有自己对应的Accumulator,用于存放该分组统计过程中的状态信息。

// 初始化一些`ColumnData`对象,用于访问输入表中数据
virtual bool initCollectInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`对象,用于将Accumulator序列化到输出表
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
// collect阶段主要过程,将多行输入数据的统计结果存储在对应Accumulator对象上
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
// 将Accumulator序列化到输出表上(方便网络传输),之后会由Searcher汇总到Qrs上
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;

引擎对每行数据调用collect方法前,会先计算出该行数据的GroupKey作为分组依据,取出该分组对应的Accumulator一同传入。用户在收到当前数据后,根据数据内容修改Accumulator的状态。

Avg函数来讲,collect阶段要做的事情就是记录当前Group下的数据条目数以及数值总和,为未来均值的计算作准备。

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::collect(Row inputRow, Accumulator *acc) {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    avgAcc->count++;
    avgAcc->sum += _inputColumn->get(inputRow);
    return true;
}

引擎通过访问 AvgAggFuncCreator::createLocalFunction ,获得当前阶段的AggFunc。注意,这一阶段的输出AvgAccumulator 上的所有属性。

AggFunc *AvgAggFuncCreator::createLocalFunction(
        const vector<ValueType> &inputTypes,
        const vector<string> &inputFields,
        const vector<string> &outputFields);

merge阶段

本阶段在Qrs上执行,用于将各个Searcher上返回的初步统计结果进行处理,输出最终的结果。主要的处理过程在AggFuncmerge 函数内

// 初始化一些`ColumnData`对象,用于访问输入表中数据,包括Accumulator数据
virtual bool initMergeInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`对象,用于输出统计结果输出结果
virtual bool initResultOutput(const TablePtr &outputTable);
// merge阶段主要过程,将来自多个Searcher的Accumulator信息整合
virtual bool merge(Row inputRow, Accumulator *acc);
// 计算最终结果并输出
virtual bool outputResult(Accumulator *acc, Row outputRow) const;

例如单次查询中,多个Searcher中都有GroupKey=AppleAccumulator上报,则在merge阶段引擎会为GroupKey=Apple新生成一个Accumulator,用于将各个Searcher传入的相关Accumulator信息聚合。

可以注意到Avg函数在merge阶段,需要将所有collect阶段的Accumulator.count累加,才能得到该Group在所有Searcher上的数据条目数。这与collect阶段统计count的实现并不相同,这也是为什么要针对两个阶段分别设置处理函数的原因。

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::merge(Row inputRow, Accumulator *acc) {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    avgAcc->count += _countColumn->get(inputRow);
    avgAcc->sum += _sumColumn->get(inputRow);
    return true;
}

当所有GroupAccumulator通过 merge 函数统计完毕后,引擎会依次将每个GroupAccumulator并传入`outputResult`函数获得最终输出。Avg函数这个阶段只需要执行 avg = sum/count 即可获得该Group的均值

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::outputResult(Accumulator *acc, Row outputRow) const {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    assert(avgAcc->count > 0);
    double avg = (double)avgAcc->sum / avgAcc->count;
    _avgColumn->set(outputRow, avg);
    return true;
}

引擎通过访问 AvgAggFuncCreator::createGlobalFunction ,获得当前阶段的AggFunc。注意,这一阶段的输入Accumulator字段。

AggFunc *AvgAggFuncCreator::createGlobalFunction(
        const vector<ValueType> &inputTypes,
        const vector<string> &inputFields,
        const vector<string> &outputFields);

特殊情况

为提升性能,当执行引擎发现 GROUP BY 的字段即为Searchers数据分列的依据时(即各个Searcher之间不会有数据的GroupKey相同),会将UDAFmerge阶段也下沉到Searcher上进行。即在单次Query中,每个Searcher会依次执行UDAFcollect阶段、merge两个阶段(但会跳过其中无用的Accumulator序列化和反序列化),Qrs仅仅将各个Searcher返回的聚合统计结果进行简单粘贴,不再执行merge阶段。

此优化可以有效降低某些场景下Qrs的负载,提升对Query的处理性能。该优化对UDAF的编写者是无感知的,无需为此调整UDAF的设计。

  • 本页导读 (1)
  • 使用介绍
  • 内置UDAF列表
  • 使用示例
  • 执行机制
  • 实现原理