本文介绍流表的使用方法及示例。
概述
流表:流表用于存储随时间发生的事件,仅支持插入数据,不支持更新和删除数据。流表可以被看作为事件的集合,随着时间的推移,事件不断被添加到表中,因此该集合是无边界的。流表支持设置数据过期时间,默认为7天。
流表中的时间属性:表示在某一时间、某个地点、某人发生了某一事件。时间属性包含了摄取时间、事件时间和处理时间。
摄取时间:数据写入流引擎的时间,由系统自动生成。
事件时间:业务逻辑中定义的时间,可以指定某个列作为事件时间。
处理时间:流引擎计算处理数据的时间,由系统自动生成。
重要当使用窗口函数时,系统需要通过时间属性来判断数据属于哪个窗口,此时时间属性必须为事件时间或处理时间。
创建流表
指定摄取时间
您可以在建表语句中通过METADATA FROM
语句,使数据写入时携带摄取时间。
CREATE STREAM MyStream (
`user_id` BIGINT,
`name` STRING,
`ingest_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
);
选择某一列作为事件时间
重要
事件时间必须是TIMESTAMP或者TIMESTAMP_LTZ类型。
以下示例指定user_action_time为事件时间。
CREATE STREAM user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用延迟5秒的策略来生成watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) ;
定义处理时间
使用PROCTIME()
函数定义处理时间,函数PROCTIME()
返回的数据类型为TIMESTAMP_LTZ。
CREATE STREAM user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
);
读取流表数据
读取全表数据
SELECT * FROM MyStream;
由于流表是无界的,因此该查询会不间断地进行。如果想要停止访问数据,请使用组合键Ctrl+C
终止查询。
从最早的数据开始读取
SELECT /*+ OPTIONS('scan.startup.mode'='earliest-offset')*/ * FROM MyStream;
指定历史时间读取数据
SELECT /*+ OPTIONS('scan.startup.mode'='timestamp', scan.startup.timestamp-millis'='1647360000000')*/ * FROM MyStream;
该文章对您有帮助吗?