您可以通过Local-Global优化解决Aggregate数据倾斜问题。
背景信息
Local-Global优化即将原先的Aggregate分成Local和Global两阶段聚合,即MapReduce模型中Combine+Reduce处理模式。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将收到的Accumulator merge起来,得到最终的结果(globalAgg)。
Local-Global本质上能够靠localAgg聚合掉倾斜的数据,从而降低globalAgg热点,从而提升性能。Local-Global用于提升SUM、COUNT、MAX、MIN和AVG等普通Aggregate性能,以及解决这些场景下的数据热点问题。

调优方式
UDAF必须实现merge方法才可以触发Local-Global优化。实现merge方法请参见示例代码。
说明 Blink 2.0及以后版本默认开启Local-Global。如果您需要关闭Local-Global,您可以在作业参数中,设置blink.localAgg.enabled=false。
示例代码
import org.apache.flink.table.functions.AggregateFunction;
public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
//定义存放count udaf的状态的accumulator数据结构
public static class CountAccum {
public long total;
}
//初始化count udaf的accumulator
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}
//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator
public void accumulate(CountAccum accumulator, Object iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交