会话窗口

本文为您介绍如何使用Flink会话窗口函数。

定义

会话窗口(SESSION)通过SESSION活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。

函数语法

SESSION函数用于在GROUP BY子句中定义会话窗口。

SESSION(<time-attr>, <gap-interval>)

入参

参数

说明

示例

time-attr

参数必须是流中的一个合法的时间属性字段,指定为Processing Time或Event Time。详情请参见时间属性

-

gap-interval

会话的超时时间或不活动间隔。如果在一个会话的最后一个元素到达后的 <gap-interval> 时间内没有新的元素到达,则该会话将关闭。任何后续到达的元素都将被分配到一个新的会话中。格式为INTERVAL 'num' timeUnit

INTERVAL '10' SECOND设置会话的超时时间为10秒。

标识函数

使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。

窗口标识函数

返回类型

描述

SESSION_START(<time-attr>, <gap-interval>)

Timestamp

返回窗口的起始时间(包含边界)。例如[00:10,00:15]的窗口,返回00:10,即为此会话窗口内第一条记录的时间。

SESSION_END(<time-attr>, <gap-interval>)

Timestamp

返回窗口的结束时间(包含边界)。例如[00:00,00:15]的窗口,返回 00:15,即为此会话窗口内最后一条记录的时间+<gap-interval>

SESSION_ROWTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回窗口的结束时间(不包含边界)。例如(00:00,00:15)的窗口,返回00:14:59.999 。返回值是一个rowtime attribute,也就是可以基于该字段进行时间类型的操作,例如级联窗口。该参数只能用于基于Event Time的Window。

SESSION_PROCTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回窗口的结束时间(不包含边界)。例如(00:00,00:15)的窗口,返回 00:14:59.999 。返回值是一个Proctime Attribute,也就是可以基于该字段进行时间类型的操作,例如级联窗口。该参数只能用于基于Processing Time的Window。

示例

统计每个用户在每个活跃会话期间的点击次数,会话超时时长为30秒。

  • 测试表user_clicks数据

    username (VARCHAR)

    click_url (VARCHAR)

    eventtime (VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • 测试语句

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • 测试结果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 10:00:00.0

    2024-10-10 10:00:40.0

    Jark

    2

    2024-10-10 10:00:49.0

    2024-10-10 10:01:35.0

    Jark

    2

    2024-10-10 10:01:58.0

    2024-10-10 10:02:28.0

    Jark

    1

    2024-10-10 10:02:10.0

    2024-10-10 10:02:40.0

    Timo

    1