本文为您介绍Flink SQL支持的Event Time和Processing Time时间类型。

Flink支持3种与流数据处理相关的时间概念:Processing Time、Event Time和Ingestion Time。

Flink SQL仅支持其中的2种时间类型Event Time和Processing Time:
  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间)。Event Time必须是您提供在数据储存里的数据。
  • Processing Time:系统对事件进行处理的本地系统时间,单位为毫秒。

Event Time

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。

由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要明文定义一个Watermark计算方法。

窗口函数基于Event Time聚合的示例如下。
CREATE TABLE tt_stream(
  a VARCHAR,
  b VARCHAR,
  c TIMESTAMP,
  WATERMARK wk1 FOR c as withOffset(c, 1000)  --Watermark计算方法。
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);

CREATE TABLE rds_output(
  id VARCHAR,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT
) WITH (
  type = 'rds',
  url = 'jdbc:mysql://****3306/test',
  tableName = '<yourTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);

INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(c, INTERVAL '1' SECOND) AS c, 
     CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f, 
     COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a

Processing Time

Processing Time是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time列。
filedName as PROCTIME()
窗口函数基于Processing Time聚合的示例如下。
CREATE TABLE mq_stream (
  a VARCHAR,
  b VARCHAR,
  c BIGINT,
  d AS PROCTIME()   --在数据源表的声明中明文定义一个Processing Time列。
) WITH (
  type = 'mq',
  topic = '<yourTopic>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);

CREATE TABLE rds_output (
  id VARCHAR,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = '<yourDatebaseURL>',
  tableName = '<yourDatabasTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);


INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(d, INTERVAL '1' SECOND) AS c, 
     SESSION_END(d, INTERVAL '1' SECOND) AS f, 
     COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a         

时间属性字段传递

时间属性字段经过如下4种操作后会失去时间属性特性 : 如果经过以上4种操作后,继续使用该时间属性字段进行窗口函数运算,会出现类似org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.的报错。