窗口聚合

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

实时计算Flink版的窗口聚合支持老语法分组窗口聚合(Group Window Aggregation)和新语法窗口表值函数聚合(Window TVF Aggregation)两种形式。本文为您介绍窗口聚合新老语法详情、窗口表值函数和聚合语句无法合并的场景、以及新老语法对更新流的支持情况

背景信息

  • 分组窗口聚合(老语法):对应GroupWindowAggregation算子,支持TUMBLE、HOP、SESSION窗口类型。

  • 窗口表值函数聚合(新语法):基于Window TVF新语法的窗口聚合,具有所有性能调优中提到的性能优化措施、支持标准的GROUPING SETS语法、可以在窗口聚合结果上使用窗口Top等优势。对应WindowAggregate算子,支持TUMBLEHOPCUMULATESESSION窗口函数。

说明

分组窗口聚合(老语法)

分组窗口聚合定义在SQLGROUP BY子句中,和普通的GROUP BY子句一样,包含分组窗口函数的GROUP BY子句的查询会对各组分别计算,各自产生一个结果行。

分组窗口聚合的语法、样例及特性等详情,请参见Group Window Aggregation

窗口表值函数聚合(新语法)

窗口聚合是通过GROUP BY子句定义的,其特征是包含由窗口表值函数产生的window_start和 window_end列。和普通的GROUP BY子句一样,窗口聚合会为每个组计算出一行数据。

和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态。

窗口表值函数聚合的语法、样例及特性等,请参见Window TVF Aggregation

SESSION窗口表值函数聚合在不同VVR版本中的区别

VVR 11.x(对应Flink 1.20版本)语法

SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)

参数含义如下:

  • data:拥有时间属性列的表。

  • keycols:列描述符,决定会话窗口应该使用哪些列来分区数据。

  • timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。

  • gap:两个事件被认为属于同一个会话窗口的最大时间间隔。

(建议弃用)VVR 8.x(对应Flink 1.17版本)语法

SESSION(TABLE data, DESCRIPTOR(timecol), gap)

参数含义如下:

  • data:拥有时间属性列的表。

  • timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。

  • gap:两个事件被认为属于同一个会话窗口的最大时间间隔。

实时计算引擎VVR 8.xVVR 11.xSESSION窗口表值函数使用区别如下:

特性

VVR 11.x

VVR 8.x(建议弃用)

差异说明

语法结构

SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)

SESSION(TABLE data, DESCRIPTOR(timecol), gap)

VVR 8.x缺少PARTITION BY 语法,需通过聚合语句隐式指定分区字段。

分区字段指定方式

支持显式 PARTITION BY(keycols) 指定分区字段。

必须通过聚合语句的 GROUP BY 子句隐式指定分区字段。

VVR 8.x强制要求分区字段为聚合语句中非 window_start、window_end、window_time 的GROUP KEY。

分区字段限制

无强制限制,可自由选择分区字段。

分区字段必须与聚合语句的 GROUP BY 字段一致(且排除窗口时间字段)。

VVR 8.x通过聚合逻辑隐式绑定分区字段,VVR 11.x语法更灵活。

参数完整性

完整参数:dataPARTITION BYDESCRIPTOR(timecol)gap

简化参数:dataDESCRIPTOR(timecol)gap

VVR 8.x移除了 PARTITION BY 参数,依赖聚合上下文推断分区逻辑。

单独使用支持性

支持单独调用 SESSION() 函数(不依赖聚合)。

不支持单独使用,必须与聚合语句(如 GROUP BY)绑定。

VVR 8.x强制要求函数与聚合语句耦合,VVR 11.x支持更灵活的使用场景。

窗口函数与聚合合并性

支持窗口函数与聚合语句合并(如 SUM(price))。

不支持窗口表值函数和聚合语句无法合并的场景(需保持聚合逻辑与窗口函数一致)。

VVR 8.x对聚合与窗口的合并使用有严格限制。

以下示例中SQL语法是等价的,都将使用item字段作为SESSION窗口函数的分区字段:

-- tables must have time attribute, e.g. `bidtime` in this table
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;
  
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
  FROM TABLE(
      SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY item, window_start, window_end;

示例对比说明:

场景

VVR 11.x SQL

VVR 8.x SQL(建议弃用)

说明

SESSION窗口分区

SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)

SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)

两者均通过 item 字段分区,但VVR 8.x需依赖 GROUP BY item 隐式绑定分区逻辑。

聚合与窗口合并

支持直接合并(如 SUM(price) 在窗口内计算)。

仅当聚合字段与窗口分区字段一致时支持(如 GROUP BY item)。

VVR 8.x对聚合与窗口的合并使用有隐性约束。

窗口表值函数和聚合语句无法合并的场景

以下场景均以SESSION窗口为例,同样适用于其他窗口表值函数。

警告

当窗口表值函数和聚合语句无法合并时,如果使用Processing Time作为窗口列来划分窗口,则会导致窗口聚合(Window Aggregation)节点使用被窗口表值函数(Window TVF)节点物化的Processing Time列作为窗口的时间属性,在聚合计算时会受到来自于源表水印(Watermark)的干扰,从而导致窗口提前输出并可能出现类似事件时间窗口的延迟数据丢弃。请改写您的SQL,尽量避免窗口表值函数和聚合语句无法合并的情况发生。

  • 在窗口表值函数和聚合语句之间,包含对window_start、window_endwindow_time字段的过滤或计算。例如:

    -- 包含对window_start的过滤
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        where window_start >= TIMESTAMP '2020-04-15 08:06:00.000')
        GROUP BY item, window_start, window_end;
      
    -- 包含对window_start的计算
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
    
    -- 包含对window_start的计算
    > SELECT window_start, window_end, item, SUM(price) AS total_price
        FROM
        (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
        GROUP BY item, window_start, window_end;
    
  • 窗口表值函数和UDTF同时使用。例如:

    > SELECT window_start, window_end, category, SUM(price) AS total_price
        FROM
        (SELECT category, price, window_start, window_end FROM
        TABLE(
        SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)),
        LATERAL TABLE(category_udtf(item)) as T(category))
        GROUP BY category, window_start, window_end;
  • 聚合语句的GROUP KEY中未同时包含window_startwindow_end。例如:

     SELECT window_start, item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start;
  • 聚合函数使用python UDAF。

  • 聚合函数使用GROUPING SETS、CUBEROLLUP语法,导致window_startwindow_end不在同一组GROUP KEY中。例如:

    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY GROUPING SETS((item), (window_start), (window_end));
      
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY CUBE (item, window_start, window_end);
      
    > SELECT item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY ROLLUP (item, window_start, window_end);
  • 聚合语句中,聚合函数使用窗口列window_start、window_endwindow_time进行计算。例如:

    > SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start, window_end;
      

新老语法对更新流的支持情况

窗口函数

老语法

(GroupWindowAggregation算子)

新语法

(WindowAggregate算子)

VVR、社区Flink

VVR

社区Flink

TUMBLE

支持

支持

不支持

HOP

支持

支持

不支持

SESSION

支持

支持

说明

VVR和社区Flink关于Session窗口区别请参见Queries语句

Flink 1.19支持

CUMULATE

N/A

支持

说明

实时计算引擎VVR 8.0.6及以上版本支持。

不支持

在对更新流的支持上,老语法窗口聚合(GroupWindowAggregation算子)支持更新流(VVR和社区Flink保持一致),新语法(WindowAggregate算子)社区Flink(1.16~1.18)不支持更新流,而VVR实现了新老语法的内部融合,可以自动根据输入流的情况选择支持的算子,实现社区Flink新语法中不支持更新流的TUMBLE、HOP窗口聚合对更新流的支持。