WATERMARK语句在流式查询中用来处理数据乱序问题。本文介绍WATERMARK语法及相关的示例。
语法
SELECT watermark(projectItem, durationSpec) as watermarkItem, projectItem [, projectItem ]*
FROM tableExpression
WATERMARK主要是为了解决数据流场景中常见的数据延迟问题。Spark在Aggregate和Join计算过程中,计算引擎会维护中间State数据。引入WATERMARK特性可以在数据出现延迟时,正确地丢弃延迟超时的数据,并且自动地从内存中丢弃过期的中间State数据,这个对流式查询的稳定运行至关重要。
更多WATERMARK的详细信息请参见如下信息:
示例
stream_source表需要有一个TIMESTAMP类型的表示Event time的字段,例如数据延迟最大为10秒。Aggregate+Watermark和Join+Watermark示例如下:
- Aggregate+Watermark
SELECT watermark(ts, interval 10 seconds), count(*) as col1, col2 FROM stream_source GROUP BY ts, col2
-
Join+Watermark
SELECT watermark(stream_source_1.ts, interval 20 second) as ts1, watermark(stream_source_2.ts, interval 10 second) as ts2, stream_source_1.col1 as col1, stream_source_2.col2 as col2 FROM stream_source_1 INNER JOIN stream_source_2 ON stream_source_1.col1=stream_source_2.col1 AND ts1 >= ts2 AND ts1 <= ts2 + interval 10 seconds