本文介绍Fluss 的虚拟表 (virtual table)。
基本概念
Fluss 的虚拟表 (virtual-tables) 是系统自动生成的表,用于访问元数据、变更数据及其他系统信息,无需额外存储数据。通过在基础表名后追加后缀(如 `$changelog`)即可访问。
Fluss 支持以下虚拟表类型:
虚拟表 | 后缀 | 描述 | 支持的表类型 |
| 提供带有元数据的原始变更日志流访问 | 主键表、日志表 | |
| 提供包含变更前后镜像的 binlog 格式数据 | 仅主键表 |
Changelog 表
`$changelog` 虚拟表提供对表的原始变更日志流的只读访问,允许你审计和处理所有数据变更及其关联的元数据。
在表名后追加 `$changelog` 即可访问变更日志:
SELECT * FROM my_table$changelog;表结构
Changelog 虚拟表在原始表列之前包含三个元数据列:
列名 | 类型 | 描述 |
| STRING NOT NULL | 变更操作的类型(参见变更类型) |
| BIGINT NOT NULL | 日志中的偏移位置 |
| TIMESTAMP_LTZ(3) NOT NULL | 变更提交的时间戳 |
变更类型
_change_type列表示数据变更的类型:
主键表
变更类型 | 描述 |
| 新插入了一行 |
| 更新行的旧值(撤回) |
| 更新行的新值 |
| 删除了一行 |
日志表
对于日志表(仅追加),只使用一种变更类型:
变更类型 | 描述 |
| 新插入了一行到日志中 |
示例
以一个跟踪用户订单的主键表为例:
-- 创建主键表
CREATE TABLE orders (
order_id INT NOT NULL,
customer_name STRING,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- 插入一条记录
INSERT INTO orders VALUES (1, 'Rhea', 100.00);
-- 更新记录
INSERT INTO orders VALUES (1, 'Rhea', 150.00);
-- 删除记录
DELETE FROM orders WHERE order_id = 1;
-- 查询变更日志
SELECT * FROM orders$changelog;输出:
+---------------+-------------+---------------------+----------+---------------+---------+
| _change_type | _log_offset | _commit_timestamp | order_id | customer_name | amount |
+---------------+-------------+---------------------+----------+---------------+---------+
| insert | 0 | 2024-01-15 10:30:00 | 1 | Rhea | 100.00 |
| update_before | 1 | 2024-01-15 10:35:00 | 1 | Rhea | 100.00 |
| update_after | 2 | 2024-01-15 10:35:00 | 1 | Rhea | 150.00 |
| delete | 3 | 2024-01-15 10:40:00 | 1 | Rhea | 150.00 |
+---------------+-------------+---------------------+----------+---------------+---------+启动模式
模式 | 描述 |
| 从日志的起始位置开始读取 |
| 从日志的当前末尾开始读取(仅读取新变更) |
| 从指定的时间戳开始读取(自 epoch 以来的毫秒数) |
Changelog 表支持不同的启动模式来控制读取的起始位置:
-- 从起始位置读取(默认)
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- 仅读取从现在开始的新变更
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- 从指定时间戳读取
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;限制
目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。
Binlog 表
$binlog 虚拟表提供对变更数据的访问,每条记录包含行的变更前镜像和变更后镜像。
$binlog 虚拟表仅适用于主键表。
在主键表名后追加 $binlog 即可访问 binlog:
SELECT * FROM my_pk_table$binlog;表结构
Binlog 虚拟表包含三个元数据列,后跟嵌套的 before 和 after 行结构:
列名 | 类型 | 描述 |
| STRING NOT NULL | 变更操作的类型: |
| BIGINT NOT NULL | 日志中的偏移位置 |
| TIMESTAMP_LTZ(3) NOT NULL | 变更提交的时间戳 |
| ROW<...> | 变更前的行值(插入时为 NULL) |
| ROW<...> | 变更后的行值(删除时为 NULL) |
before 和 after 列是嵌套的 ROW 类型,包含基础表的所有列。
变更类型
变更类型 | 描述 |
|
|
| 新插入了一行 | NULL | 包含新行的值 |
| 更新了一行 | 包含旧行的值 | 包含新行的值 |
| 删除了一行 | 包含被删除行的值 | NULL |
示例
-- 创建主键表
CREATE TABLE users (
user_id INT NOT NULL,
name STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- 插入、更新,然后删除一条记录
INSERT INTO users VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO users VALUES (1, 'Alice Smith', 'alice.smith@example.com');
DELETE FROM users WHERE user_id = 1;
-- 查询 binlog
SELECT * FROM users$binlog;输出:
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| _change_type | _log_offset | _commit_timestamp | before | after |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| insert | 0 | 2024-01-15 10:30:00 | NULL | (1, Alice, alice@example.com) |
| update | 2 | 2024-01-15 10:35:00 | (1, Alice, alice@example.com) | (1, Alice Smith, alice.smith@example.com) |
| delete | 3 | 2024-01-15 10:40:00 | (1, Alice Smith, alice.smith@example.com) | NULL |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+访问嵌套字段
可以从 before 和 after 结构中访问单个字段:
SELECT
_change_type,
_commit_timestamp,
`before`.name AS old_name,
`after`.name AS new_name
FROM users$binlog
WHERE _change_type = 'update';启动模式
模式 | 描述 |
| 从日志的起始位置开始读取 |
| 从日志的当前末尾开始读取(仅读取新变更) |
| 从指定的时间戳开始读取(自 epoch 以来的毫秒数) |
Binlog 表支持不同的启动模式来控制读取的起始位置:
-- 从起始位置读取(默认)
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- 仅读取从现在开始的新变更
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- 从指定时间戳读取
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;限制
目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。