文档

流表

更新时间:

本文介绍流表的使用方法及示例。

概述

  • 流表:流表用于存储随时间发生的事件,仅支持插入数据,不支持更新和删除数据。流表可以被看作为事件的集合,随着时间的推移,事件不断被添加到表中,因此该集合是无边界的。流表支持设置数据过期时间,默认为7天。

  • 流表中的时间属性:表示在某一时间、某个地点、某人发生了某一事件。时间属性包含了摄取时间、事件时间和处理时间。

    • 摄取时间:数据写入流引擎的时间,由系统自动生成。

    • 事件时间:业务逻辑中定义的时间,可以指定某个列作为事件时间。

    • 处理时间:流引擎计算处理数据的时间,由系统自动生成。

    重要

    当使用窗口函数时,系统需要通过时间属性来判断数据属于哪个窗口,此时时间属性必须为事件时间或处理时间。

创建流表

指定摄取时间

您可以在建表语句中通过METADATA FROM语句,使数据写入时携带摄取时间。

CREATE STREAM MyStream (
  `user_id` BIGINT,
  `name` STRING,
  `ingest_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
);

选择某一列作为事件时间

重要

事件时间必须是TIMESTAMP或者TIMESTAMP_LTZ类型。

以下示例指定user_action_time为事件时间。

CREATE STREAM user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明 user_action_time 是事件时间属性,并且用延迟5秒的策略来生成watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) ;

定义处理时间

使用PROCTIME()函数定义处理时间,函数PROCTIME()返回的数据类型为TIMESTAMP_LTZ。

CREATE STREAM user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
);

读取流表数据

读取全表数据

SELECT * FROM MyStream;

由于流表是无界的,因此该查询会不间断地进行。如果想要停止访问数据,请使用组合键Ctrl+C终止查询。

从最早的数据开始读取

SELECT /*+ OPTIONS('scan.startup.mode'='earliest-offset')*/ * FROM MyStream;

指定历史时间读取数据

SELECT /*+ OPTIONS('scan.startup.mode'='timestamp', scan.startup.timestamp-millis'='1647360000000')*/ * FROM MyStream;
  • 本页导读 (1)
文档反馈