WATERMARK语句在流式查询中用来处理数据乱序问题。本文介绍WATERMARK语法及相关的示例。

语法

SELECT watermark(projectItem, durationSpec) as watermarkItem, projectItem [, projectItem ]*
FROM tableExpression

WATERMARK主要是为了解决数据流场景中常见的数据延迟问题。Spark在Aggregate和Join计算过程中,计算引擎会维护中间State数据。引入WATERMARK特性可以在数据出现延迟时,正确地丢弃延迟超时的数据,并且自动地从内存中丢弃过期的中间State数据,这个对流式查询的稳定运行至关重要。

更多WATERMARK的详细信息请参见如下信息:

示例

stream_source表需要有一个TIMESTAMP类型的表示Event time的字段,例如数据延迟最大为10秒。Aggregate+Watermark和Join+Watermark示例如下:
  • Aggregate+Watermark
    SELECT watermark(ts, interval 10 seconds), count(*) as col1, col2
    FROM stream_source
    GROUP BY ts, col2
  • Join+Watermark
    SELECT
       watermark(stream_source_1.ts, interval 20 second) as ts1,
       watermark(stream_source_2.ts, interval 10 second) as ts2,
       stream_source_1.col1 as col1,
       stream_source_2.col2 as col2
     FROM
     stream_source_1
     INNER JOIN
     stream_source_2
     ON stream_source_1.col1=stream_source_2.col1
     AND ts1 >= ts2
     AND ts1 <= ts2 + interval 10 seconds