全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
流计算

窗口函数介绍

更新时间:2018-03-28 15:43:33

什么是窗口函数

StreamComputerSQL支持基于无限大窗口的aggregate(这个时候我们不需要显式的在SQL query中加任何的窗口)。除了对无限数据的aggreagte,StreamComputerSQL还支持对一个特定的窗口的聚合。例如有用户想统计在过去的1分钟内有多少用户点击了某个的网页。在这种情况下,我们可以定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

StreamComputerSQL支持的窗口聚合主要是两种:window aggregate,和over aggregate。他们最核心的区别是over aggregate从语义上保障了对每个输入都有一个输出,因此over agregate常被用于ranking,moving average等场景。关于over aggregate的细节可以参考后面的over aggregate的session章。本session下来主要介绍window aggregate。

Window aggregate支持两种时间类型做窗口:Event Time和Processing Time。每种类型下,又分别支持三种窗口类型:滚动窗口(TUMBLE),滑动窗口(HOP),和会话窗口(SESSION)。

时间类型

BlinkSQL支持两种时间:

  1. Event Time:用户提供的事件时间(通常是数据的最原始的创建时间),event time一定是用户提供在表的schema里的数据
  2. Processing Time:表示系统对事件进行处理的本地系统时间

下图中是不同时间属性在流上的概念:

![时间属性

从上面的定义可以看出,ingestion time和 processing time是系统为流记录增加的时间属性,用户并不能控制。EventTime则是流记录本身携带的时间属性,但由于数据本身有乱序,加之网络抖动或其它原因,eventTime为t1时刻的纪录,有可能会晚于t2(t2 > t1)时刻的被blink处理。

基于processing time的Aggregate:

processing time是系统产生的,不在用户的原始数据中,我们需要显式的定义一个processing time列。

  1. filedName as PROCTIME()

这个定义需要在source的DDL中显式指明,示例如下:

  1. CREATE TABLE tt_stream (
  2. a varchar,
  3. b varchar,
  4. c BIGINT,
  5. d AS PROCTIME()
  6. ) with (
  7. type = 'SLS',
  8. topic = 'blink_tt2tt_test',
  9. accessId = '06221748XXXX',
  10. accessKey = 'a62cfe86-ba5a-4eeXXXXXXX7b'
  11. );
  12. CREATE TABLE rds_output (
  13. id varchar,
  14. c TIMESTAMP,
  15. f TIMESTAMP,
  16. cnt BIGINT
  17. ) with (
  18. type = 'rds',
  19. url = 'jdbc:mysql://XXXXXXXXX:3306/test',
  20. tableName = 'datahub2rds',
  21. userName = 'tXXXX',
  22. password = '1XXXXX6'
  23. );
  24. INSERT INTO rds_output
  25. SELECT a AS id,
  26. SESSION_START(d, INTERVAL '1' SECOND) AS c,
  27. SESSION_END(d, INTERVAL '1' SECOND) AS f,
  28. COUNT(a) AS cnt
  29. FROM tt_stream
  30. GROUP BY SESSION(d, INTERVAL '1' SECOND), a

基于Event Time的Aggregate:

event time是用户的原始数据,我们不需要显式重新定义一个event time列, 但是我们要求用户必须指定watermark的计算方法。这是因为用户的数据往往是乱序的,如果不配置一个watermark来合理的delay用户数据,那样数据聚合的结果往往都有很大的偏差。

Watermark

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,watermark的定义是source表DDL定义的一部分。Blink提供了如下语法定义 watermark:

  1. WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识了这个 watermark 的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成 watermark,并且标识该列为 Event Time 列,可以在后续 query 中用来定义窗口。
  • withOffset 是目前提供的watermark的生成策略,是根据<rowtime_field> - offset生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间,比如表中有个 rowtime 字段,类型为Timestamp,其中某一值为1501750584000(2017-08-03 08:56:24.000)。那么如果用户需要定义一个基于该rowtime列的watermark,watermark策略为偏移4秒,需要如下定义。

  1. WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

那么,这条数据的watermark时间就是 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的watermark时间是什么含义呢?即:timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

注意:

在使用Event Time watermark的时候的rowtime必须是TIMESTAMP的类型,现在支持毫秒级别的、在Unix时间戳里是13位的,如果是其他类型或者是在Unix时间戳不是13位的建议使用计算列来做转换。

Event Time和Processing Time的声明只能在源表上做声明!

总结:

  1. WaterMark的含义是所有时间t’< t的事件都已经发生。假如watermark t已经生效,那么后续eventTime小于t的记录都会被丢掉(暂时blink的做法是丢弃这些来的更晚的数据,以后会给用户配置,让更晚的数据也能继续update)。
  2. 针对乱序的的流,WaterMark至关重要,这样即使一些事件延迟到达,也不至于过于影响window窗口的计算的正确性。
  3. 并行数据流中,当Operator有多个输入流时,Operator的event time以最小流event time为准。

image

最后,给出一个使用event time做aggregate的例子:

  1. CREATE TABLE tt_stream(
  2. a varchar,
  3. b varchar,
  4. c timeStamp,
  5. WATERMARK wk1 FOR c as withOffset(c, 1000)
  6. ) with (
  7. type = 'SLS',
  8. topic = 'blink_tt2tt_test',
  9. accessId = '0622174XXXXXXTS',
  10. accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
  11. );
  12. CREATE TABLE rds_output(
  13. id varchar,
  14. c TIMESTAMP,
  15. f TIMESTAMP,
  16. cnt BIGINT
  17. ) with (
  18. type = 'rds',
  19. url = 'jdbc:mysql://XXXXXXXX3306/test',
  20. tableName = 'datahub2rds',
  21. userName = 'XXXXXt',
  22. password = '1XXXXX'
  23. );
  24. INSERT INTO rds_output
  25. SELECT a AS id,
  26. SESSION_START(c, INTERVAL '1' SECOND) AS c,
  27. CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f,
  28. COUNT(a) AS cnt
  29. FROM tt_stream
  30. GROUP BY SESSION(c, INTERVAL '1' SECOND), a
本文导读目录