滑动窗口

本文为您介绍如何使用Flink滑动窗口函数。

定义

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

函数语法

HOP函数用在GROUP BY子句中,用来定义滑动窗口。

HOP(<time-attr>, <slide-interval>,<size-interval>)          

入参

参数

说明

示例

time-attr

参数必须是流中的一个合法的时间属性字段,指定为Processing Time或Event Time。详情请参见时间属性

-

slide-interval

滑动窗口移动的间隔,定义了连续窗口之间的时间差。格式为INTERVAL 'num' timeUnit

INTERVAL '10' SECOND设置窗口间隔10秒。

size-interval

滑动窗口的大小或持续时间,定义了每个窗口覆盖的时间范围。格式为INTERVAL 'num' timeUnit

INTERVAL '5' MINUTE设置窗口包含5分钟的数据。

滑动窗口根据slide-interval和size-interval配置大小不同,分为以下三种情况:

  • slide-interval < size-interval,则窗口会重叠,每个元素会被分配到多个窗口。

  • slide-interval = size-interval,则等同于滚动窗口(TUMBLE)。

  • slide-interval > size-interval,则为跳跃窗口,窗口之间不重叠且有间隙。

通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很适用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide-interval10秒,size-interval为5分钟。

标识函数

使用滑动窗口标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。

窗口标识函数

返回类型

描述

HOP_START(<time-attr>, <slide-interval>, <size-interval>)

TIMESTAMP

返回窗口的起始时间(包含边界)。例如[00:10, 00:15] 窗口,返回00:10

HOP_END(<time-attr>, <slide-interval>, <size-interval>)

TIMESTAMP

返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15

HOP_ROWTIME(<time-attr>, <slide-interval>, <size-interval>)

TIMESTAMP(rowtime-attr)

返回窗口的结束时间(不包含边界)。例如(00:00, 00:15) 窗口,返回00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段做时间类型的操作,例如级联窗口,只能用在基于event time的window上。

HOP_PROCTIME(<time-attr>, <slide-interval>, <size-interval>)

TIMESTAMP(rowtime-attr)

返回窗口的结束时间(不包含边界)。例如(00:00, 00:15) 窗口,返回00:14:59.999 。返回值是一个proctime attribute,即可以基于该字段做时间类型的操作,例如级联窗口,只能用在基于processing time的window上。

示例

统计每个用户过去1分钟的单击次数,每30秒更新1次,即1分钟的窗口,30秒滑动1次。

  • 测试表user_clicks数据

    username(VARCHAR)

    click_url(VARCHAR)

    eventtime(VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • 测试语句

    CREATE TEMPORARY TABLE user_clicks (
      username VARCHAR,
      click_url VARCHAR,
      eventtime VARCHAR,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE hop_output (
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO
        hop_output
    SELECT
        HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
        HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
        username,
        COUNT (click_url)
    FROM
        user_clicks
    GROUP BY
        HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;             
  • 测试结果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 09:59:30.0

    2024-10-10 10:00:30.0

    Jark

    2

    2024-10-10 10:00:00.0

    2024-10-10 10:01:00.0

    Jark

    3

    2024-10-10 10:00:30.0

    2024-10-10 10:01:30.0

    Jark

    2

    2024-10-10 10:01:00.0

    2024-10-10 10:02:00.0

    Jark

    2

    2024-10-10 10:01:30.0

    2024-10-10 10:02:30.0

    Jark

    1

    2024-10-10 10:01:30.0

    2024-10-10 10:02:30.0

    Timo

    1

    2017-10-10 10:02:30.0

    2017-10-10 10:03:30.0

    Timo

    1

    HOP窗口无法读取数据进入的时间,第一个窗口的开启时间会前移。前移时长=窗口时长-滑动步长,示例如下表。

    窗口时长(秒)

    滑动步长(秒)

    Event Time

    第一个窗口StartTime

    第一个窗口EndTime

    120

    30

    2024-07-31 10:00:00.0

    2024-07-31 09:58:30.0

    2024-07-31 10:00:30.0

    60

    10

    2024-07-31 10:00:00.0

    2024-07-31 09:59:10.0

    2024-07-31 10:00:10.0