本文介绍Fluss 的虚拟表 (virtual table)。
基本概念
Fluss 的虚拟表 (virtual-tables) 是系统自动生成的表,用于访问元数据、变更数据及其他系统信息,无需额外存储数据。通过在基础表名后追加后缀(如 `$changelog`)即可访问。
Fluss 支持以下虚拟表类型:
|
虚拟表 |
后缀 |
描述 |
支持的表类型 |
|
|
提供带有元数据的原始变更日志流访问 |
主键表、日志表 |
|
|
|
提供包含变更前后镜像的 binlog 格式数据 |
仅主键表 |
Changelog 表
`$changelog` 虚拟表提供对表的原始变更日志流的只读访问,允许你审计和处理所有数据变更及其关联的元数据。
在表名后追加 `$changelog` 即可访问变更日志:
SELECT * FROM my_table$changelog;
表结构
Changelog 虚拟表在原始表列之前包含三个元数据列:
|
列名 |
类型 |
描述 |
|
|
STRING NOT NULL |
变更操作的类型(参见变更类型) |
|
|
BIGINT NOT NULL |
日志中的偏移位置 |
|
|
TIMESTAMP_LTZ(3) NOT NULL |
变更提交的时间戳 |
变更类型
_change_type列表示数据变更的类型:
主键表
|
变更类型 |
描述 |
|
|
新插入了一行 |
|
|
更新行的旧值(撤回) |
|
|
更新行的新值 |
|
|
删除了一行 |
日志表
对于日志表(仅追加),只使用一种变更类型:
|
变更类型 |
描述 |
|
|
新插入了一行到日志中 |
示例
以一个跟踪用户订单的主键表为例:
-- 创建主键表
CREATE TABLE orders (
order_id INT NOT NULL,
customer_name STRING,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- 插入一条记录
INSERT INTO orders VALUES (1, 'Rhea', 100.00);
-- 更新记录
INSERT INTO orders VALUES (1, 'Rhea', 150.00);
-- 删除记录
DELETE FROM orders WHERE order_id = 1;
-- 查询变更日志
SELECT * FROM orders$changelog;
输出:
+---------------+-------------+---------------------+----------+---------------+---------+
| _change_type | _log_offset | _commit_timestamp | order_id | customer_name | amount |
+---------------+-------------+---------------------+----------+---------------+---------+
| insert | 0 | 2024-01-15 10:30:00 | 1 | Rhea | 100.00 |
| update_before | 1 | 2024-01-15 10:35:00 | 1 | Rhea | 100.00 |
| update_after | 2 | 2024-01-15 10:35:00 | 1 | Rhea | 150.00 |
| delete | 3 | 2024-01-15 10:40:00 | 1 | Rhea | 150.00 |
+---------------+-------------+---------------------+----------+---------------+---------+
启动模式
|
模式 |
描述 |
|
|
从日志的起始位置开始读取 |
|
|
从日志的当前末尾开始读取(仅读取新变更) |
|
|
从指定的时间戳开始读取(自 epoch 以来的毫秒数) |
Changelog 表支持不同的启动模式来控制读取的起始位置:
-- 从起始位置读取(默认)
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- 仅读取从现在开始的新变更
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- 从指定时间戳读取
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;
限制
-
目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。
Binlog 表
$binlog 虚拟表提供对变更数据的访问,每条记录包含行的变更前镜像和变更后镜像。
$binlog 虚拟表仅适用于主键表。
Binlog 表功能需要 Flink 计算引擎 VVR 11.6 及以上版本。
在主键表名后追加 $binlog 即可访问 binlog:
SELECT * FROM my_pk_table$binlog;
表结构
Binlog 虚拟表包含三个元数据列,后跟嵌套的 before 和 after 行结构:
|
列名 |
类型 |
描述 |
|
|
STRING NOT NULL |
变更操作的类型: |
|
|
BIGINT NOT NULL |
日志中的偏移位置 |
|
|
TIMESTAMP_LTZ(3) NOT NULL |
变更提交的时间戳 |
|
|
ROW<...> |
变更前的行值(插入时为 NULL) |
|
|
ROW<...> |
变更后的行值(删除时为 NULL) |
before 和 after 列是嵌套的 ROW 类型,包含基础表的所有列。
变更类型
|
变更类型 |
描述 |
|
|
|
|
新插入了一行 |
NULL |
包含新行的值 |
|
|
更新了一行 |
包含旧行的值 |
包含新行的值 |
|
|
删除了一行 |
包含被删除行的值 |
NULL |
示例
-- 创建主键表
CREATE TABLE users (
user_id INT NOT NULL,
name STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- 插入、更新,然后删除一条记录
INSERT INTO users VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO users VALUES (1, 'Alice Smith', 'alice.smith@example.com');
DELETE FROM users WHERE user_id = 1;
-- 查询 binlog
SELECT * FROM users$binlog;
输出:
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| _change_type | _log_offset | _commit_timestamp | before | after |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| insert | 0 | 2024-01-15 10:30:00 | NULL | (1, Alice, alice@example.com) |
| update | 2 | 2024-01-15 10:35:00 | (1, Alice, alice@example.com) | (1, Alice Smith, alice.smith@example.com) |
| delete | 3 | 2024-01-15 10:40:00 | (1, Alice Smith, alice.smith@example.com) | NULL |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
访问嵌套字段
可以从 before 和 after 结构中访问单个字段:
SELECT
_change_type,
_commit_timestamp,
`before`.name AS old_name,
`after`.name AS new_name
FROM users$binlog
WHERE _change_type = 'update';
启动模式
|
模式 |
描述 |
|
|
从日志的起始位置开始读取 |
|
|
从日志的当前末尾开始读取(仅读取新变更) |
|
|
从指定的时间戳开始读取(自 epoch 以来的毫秒数) |
Binlog 表支持不同的启动模式来控制读取的起始位置:
-- 从起始位置读取(默认)
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- 仅读取从现在开始的新变更
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- 从指定时间戳读取
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;
限制
-
目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。
-
Binlog 表仅支持流模式读取,不支持批模式。