本文为您介绍如何使用Flink滑动窗口函数。
定义
滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
函数语法
HOP函数用在GROUP BY子句中,用来定义滑动窗口。
HOP(<time-attr>, <slide-interval>,<size-interval>)
入参
参数 | 说明 | 示例 |
time-attr | 参数必须是流中的一个合法的时间属性字段,指定为Processing Time或Event Time。详情请参见时间属性。 | - |
slide-interval | 滑动窗口移动的间隔,定义了连续窗口之间的时间差。格式为 |
|
size-interval | 滑动窗口的大小或持续时间,定义了每个窗口覆盖的时间范围。格式为 |
|
滑动窗口根据slide-interval和size-interval配置大小不同,分为以下三种情况:
slide-interval < size-interval,则窗口会重叠,每个元素会被分配到多个窗口。
slide-interval = size-interval,则等同于滚动窗口(TUMBLE)。
slide-interval > size-interval,则为跳跃窗口,窗口之间不重叠且有间隙。
通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很适用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide-interval为10秒,size-interval为5分钟。
标识函数
使用滑动窗口标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。
窗口标识函数 | 返回类型 | 描述 |
| TIMESTAMP | 返回窗口的起始时间(包含边界)。例如 |
| TIMESTAMP | 返回窗口的结束时间(包含边界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如 |
示例
统计每个用户过去1分钟的单击次数,每30秒更新1次,即1分钟的窗口,30秒滑动1次。
测试表user_clicks数据
username(VARCHAR)
click_url(VARCHAR)
eventtime(VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
测试语句
CREATE TEMPORARY TABLE user_clicks ( username VARCHAR, click_url VARCHAR, eventtime VARCHAR, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE hop_output ( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username, COUNT (click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;
测试结果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 09:59:30.0
2024-10-10 10:00:30.0
Jark
2
2024-10-10 10:00:00.0
2024-10-10 10:01:00.0
Jark
3
2024-10-10 10:00:30.0
2024-10-10 10:01:30.0
Jark
2
2024-10-10 10:01:00.0
2024-10-10 10:02:00.0
Jark
2
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Jark
1
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Timo
1
2017-10-10 10:02:30.0
2017-10-10 10:03:30.0
Timo
1
HOP窗口无法读取数据进入的时间,第一个窗口的开启时间会前移。前移时长=窗口时长-滑动步长,示例如下表。
窗口时长(秒)
滑动步长(秒)
Event Time
第一个窗口StartTime
第一个窗口EndTime
120
30
2024-07-31 10:00:00.0
2024-07-31 09:58:30.0
2024-07-31 10:00:30.0
60
10
2024-07-31 10:00:00.0
2024-07-31 09:59:10.0
2024-07-31 10:00:10.0