实时计算Flink版的窗口聚合支持老语法分组窗口聚合(Group Window Aggregation)和新语法窗口表值函数聚合(Window TVF Aggregation)两种形式。本文为您介绍窗口聚合新老语法详情、窗口表值函数和聚合语句无法合并的场景、以及新老语法对更新流的支持情况。
背景信息
-
分组窗口聚合(老语法):对应GroupWindowAggregation算子,支持TUMBLE、HOP、SESSION窗口类型。
-
窗口表值函数聚合(新语法):基于Window TVF新语法的窗口聚合,具有所有性能调优中提到的性能优化措施、支持标准的
GROUPING SETS语法、可以在窗口聚合结果上使用窗口Top等优势。对应WindowAggregate算子,支持TUMBLE、HOP、CUMULATE和SESSION窗口函数。
-
分组窗口聚合已过时,推荐您使用更高效且功能更丰富的窗口表值函数聚合。
-
它们对于更新流的支持情况,请参见新老语法对更新流的支持情况。
分组窗口聚合(老语法)
分组窗口聚合定义在SQL的GROUP BY子句中,和普通的GROUP BY子句一样,包含分组窗口函数的GROUP BY子句的查询会对各组分别计算,各自产生一个结果行。
分组窗口聚合的语法、样例及特性等详情,请参见Group Window Aggregation。
VVR 11.x 分组窗口聚合行为变更
从 VVR 11.x(对应 Flink 1.20)开始,系统不再默认将分组窗口聚合(老语法)自动改写为窗口表值函数聚合(新语法)的执行计划。
在 VVR 8.x 中,系统自动将老语法改写为新语法的执行计划,使其获得 Local-Global 两阶段聚合优化。从 VVR 11.x 起,该自动改写不再是默认行为,老语法保持原有的物理执行计划。
影响:使用老语法的作业不再自动获得两阶段聚合优化,在数据量大或数据倾斜场景下可能影响性能。
推荐将分组窗口聚合迁移到窗口表值函数聚合(新语法)。关于两阶段聚合优化的详细生效条件,请参见窗口聚合 Local-Global 优化。
兼容旧版本行为
如暂时无法迁移,可手动开启以下参数恢复 VVR 8.x 行为:
|
参数 |
说明 |
默认值 |
|
|
开启老语法到新语法的自动改写,以获得两阶段聚合优化。 |
false(VVR 11.x 起) |
配置示例(在作业参数中添加):
table.optimizer.window-rewrite-enabled: true
此参数仅作为升级过渡的兼容手段。老语法已标记为过时,后续版本可能不再维护改写能力,建议尽早迁移到新语法。
窗口表值函数聚合(新语法)
窗口聚合是通过GROUP BY子句定义的,其特征是包含由窗口表值函数产生的window_start和 window_end列。和普通的GROUP BY子句一样,窗口聚合会为每个组计算出一行数据。
和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态。
窗口表值函数聚合的语法、样例及特性等,请参见Window TVF Aggregation。
SESSION窗口表值函数聚合在不同VVR版本中的区别
VVR 11.x(对应Flink 1.20版本)语法
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
参数含义如下:
-
data:拥有时间属性列的表。
-
keycols:列描述符,决定会话窗口应该使用哪些列来分区数据。
-
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
-
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
(建议弃用)VVR 8.x(对应Flink 1.17版本)语法
SESSION(TABLE data, DESCRIPTOR(timecol), gap)
参数含义如下:
-
data:拥有时间属性列的表。
-
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
-
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
实时计算引擎VVR 8.x与VVR 11.x的SESSION窗口表值函数使用区别如下:
|
特性 |
VVR 11.x |
VVR 8.x(建议弃用) |
差异说明 |
|
语法结构 |
|
|
VVR 8.x缺少 |
|
分区字段指定方式 |
支持显式 |
必须通过聚合语句的 |
VVR 8.x强制要求分区字段为聚合语句中非 |
|
分区字段限制 |
无强制限制,可自由选择分区字段。 |
分区字段必须与聚合语句的 |
VVR 8.x通过聚合逻辑隐式绑定分区字段,VVR 11.x语法更灵活。 |
|
参数完整性 |
完整参数: |
简化参数: |
VVR 8.x移除了 |
|
单独使用支持性 |
支持单独调用 |
不支持单独使用,必须与聚合语句(如 |
VVR 8.x强制要求函数与聚合语句耦合,VVR 11.x支持更灵活的使用场景。 |
|
窗口函数与聚合合并性 |
支持窗口函数与聚合语句合并(如 |
不支持窗口表值函数和聚合语句无法合并的场景(需保持聚合逻辑与窗口函数一致)。 |
VVR 8.x对聚合与窗口的合并使用有严格限制。 |
以下示例中SQL语法是等价的,都将使用item字段作为SESSION窗口函数的分区字段:
-- tables must have time attribute, e.g. `bidtime` in this table
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
示例对比说明:
|
场景 |
VVR 11.x SQL |
VVR 8.x SQL(建议弃用) |
说明 |
|
SESSION窗口分区 |
|
|
两者均通过 |
|
聚合与窗口合并 |
支持直接合并(如 |
仅当聚合字段与窗口分区字段一致时支持(如 |
VVR 8.x对聚合与窗口的合并使用有隐性约束。 |
窗口表值函数和聚合语句无法合并的场景
以下场景均以SESSION窗口为例,同样适用于其他窗口表值函数。
当窗口表值函数和聚合语句无法合并时,如果使用Processing Time作为窗口列来划分窗口,则会导致窗口聚合(Window Aggregation)节点使用被窗口表值函数(Window TVF)节点物化的Processing Time列作为窗口的时间属性,在聚合计算时会受到来自于源表水印(Watermark)的干扰,从而导致窗口提前输出并可能出现类似事件时间窗口的延迟数据丢弃。请改写您的SQL,尽量避免窗口表值函数和聚合语句无法合并的情况发生。
-
在窗口表值函数和聚合语句之间,包含对window_start、window_end和window_time字段的过滤或计算。例如:
-- 包含对window_start的过滤 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -
窗口表值函数和UDTF同时使用。例如:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end; -
聚合语句的GROUP KEY中未同时包含window_start和window_end。例如:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start; -
聚合函数使用python UDAF。
-
聚合函数使用GROUPING SETS、CUBE和ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。例如:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end); -
聚合语句中,聚合函数使用窗口列window_start、window_end和window_time进行计算。例如:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
新老语法对更新流的支持情况
|
窗口函数 |
老语法 (GroupWindowAggregation算子) |
新语法 (WindowAggregate算子) |
|
|
VVR、社区Flink |
VVR |
社区Flink |
|
|
TUMBLE |
支持 |
支持 |
不支持 |
|
HOP |
支持 |
支持 |
不支持 |
|
SESSION |
支持 |
支持 说明
VVR和社区Flink关于Session窗口区别请参见Queries语句 。 |
Flink 1.19支持 |
|
CUMULATE |
N/A |
支持 说明
实时计算引擎VVR 8.0.6及以上版本支持。 |
不支持 |
在对更新流的支持上,老语法窗口聚合(GroupWindowAggregation算子)支持更新流(VVR和社区Flink保持一致),新语法(WindowAggregate算子)社区Flink(1.16~1.18)不支持更新流,而VVR实现了新老语法的内部融合,可以自动根据输入流的情况选择支持的算子,实现社区Flink新语法中不支持更新流的TUMBLE、HOP窗口聚合对更新流的支持。
窗口聚合 Local-Global 优化
窗口聚合支持 Local-Global 两阶段聚合优化。启用后,优化器将单阶段窗口聚合拆分为:
-
Local Aggregate:在数据 Shuffle 之前做局部预聚合,减少网络传输数据量。
-
Global Aggregate:在 Shuffle 之后做最终聚合,输出结果。
该优化需同时满足以下 6 个条件方可生效。
条件 1:聚合阶段策略允许两阶段
table.optimizer.agg-phase-strategy 设置为 AUTO(默认)或 TWO_PHASE。若设为 ONE_PHASE,则强制禁用两阶段优化。
条件 2:窗口基于事件时间
窗口必须基于事件时间(Event Time / Rowtime),不支持处理时间窗口。
条件 3:窗口类型非 Session
支持 TUMBLE、HOP、CUMULATE 窗口类型。Session 窗口不支持两阶段优化。
条件 4:聚合函数支持 Partial Merge
所有聚合函数必须支持合并操作。内置的 SUM、COUNT、MIN、MAX、AVG 均支持。自定义 UDAF 需实现 merge() 方法。
条件 5:输入流为 Insert-Only 且窗口可转换为 TVF 形式
需同时满足以下条件:
-
输入流为 Insert-Only。
-
table.exec.emit.early-fire.enabled为false(默认)。 -
table.exec.emit.late-fire.enabled为false(默认)。 -
HOP 窗口必须对齐(窗口大小可被滑动步长整除)。
条件 6:数据分布未满足分区要求
输入数据的分布尚未满足聚合所需的分区要求。若数据已按分区键分布,优化器判断无需额外预聚合,不会生成 Local Aggregate 节点。
参数速查
|
参数 |
类型 |
默认值 |
生效条件 |
|
|
Enum |
AUTO |
不能为 ONE_PHASE |
|
|
Boolean |
false |
必须为 false |
|
|
Boolean |
false |
必须为 false |