虚拟表

更新时间:
复制为 MD 格式

本文介绍Fluss 的虚拟表 (virtual table)。

基本概念

Fluss 的虚拟表 (virtual-tables) 是系统自动生成的表,用于访问元数据、变更数据及其他系统信息,无需额外存储数据。通过在基础表名后追加后缀(如 `$changelog`)即可访问。

Fluss 支持以下虚拟表类型:

虚拟表

后缀

描述

支持的表类型

Changelog 表

$changelog

提供带有元数据的原始变更日志流访问

主键表、日志表

Binlog 表

$binlog

提供包含变更前后镜像的 binlog 格式数据

仅主键表

Changelog 表

`$changelog` 虚拟表提供对表的原始变更日志流的只读访问,允许你审计和处理所有数据变更及其关联的元数据。

在表名后追加 `$changelog` 即可访问变更日志:

SELECT * FROM my_table$changelog;

表结构

Changelog 虚拟表在原始表列之前包含三个元数据列:

列名

类型

描述

_change_type

STRING NOT NULL

变更操作的类型(参见变更类型

_log_offset

BIGINT NOT NULL

日志中的偏移位置

_commit_timestamp

TIMESTAMP_LTZ(3) NOT NULL

变更提交的时间戳

变更类型

_change_type列表示数据变更的类型:

主键表

变更类型

描述

insert

新插入了一行

update_before

更新行的旧值(撤回)

update_after

更新行的新值

delete

删除了一行

日志表

对于日志表(仅追加),只使用一种变更类型:

变更类型

描述

insert

新插入了一行到日志中

示例

以一个跟踪用户订单的主键表为例:

-- 创建主键表
CREATE TABLE orders (
    order_id INT NOT NULL,
    customer_name STRING,
    amount DECIMAL(10, 2),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket.num' = '1');

-- 插入一条记录
INSERT INTO orders VALUES (1, 'Rhea', 100.00);

-- 更新记录
INSERT INTO orders VALUES (1, 'Rhea', 150.00);

-- 删除记录
DELETE FROM orders WHERE order_id = 1;

-- 查询变更日志
SELECT * FROM orders$changelog;

输出:

+---------------+-------------+---------------------+----------+---------------+---------+
| _change_type  | _log_offset | _commit_timestamp   | order_id | customer_name | amount  |
+---------------+-------------+---------------------+----------+---------------+---------+
| insert        |           0 | 2024-01-15 10:30:00 |        1 | Rhea          |  100.00 |
| update_before |           1 | 2024-01-15 10:35:00 |        1 | Rhea          |  100.00 |
| update_after  |           2 | 2024-01-15 10:35:00 |        1 | Rhea          |  150.00 |
| delete        |           3 | 2024-01-15 10:40:00 |        1 | Rhea          |  150.00 |
+---------------+-------------+---------------------+----------+---------------+---------+

启动模式

模式

描述

earliest

从日志的起始位置开始读取

latest

从日志的当前末尾开始读取(仅读取新变更)

timestamp

从指定的时间戳开始读取(自 epoch 以来的毫秒数)

Changelog 表支持不同的启动模式来控制读取的起始位置:

-- 从起始位置读取(默认)
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

-- 仅读取从现在开始的新变更
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- 从指定时间戳读取
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;

限制

  • 目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。

Binlog 表

$binlog 虚拟表提供对变更数据的访问,每条记录包含行的变更前镜像和变更后镜像。

说明

$binlog 虚拟表仅适用于主键表

在主键表名后追加 $binlog 即可访问 binlog:

SELECT * FROM my_pk_table$binlog;

表结构

Binlog 虚拟表包含三个元数据列,后跟嵌套的 beforeafter 行结构:

列名

类型

描述

_change_type

STRING NOT NULL

变更操作的类型:insertupdatedelete

_log_offset

BIGINT NOT NULL

日志中的偏移位置

_commit_timestamp

TIMESTAMP_LTZ(3) NOT NULL

变更提交的时间戳

before

ROW<...>

变更前的行值(插入时为 NULL)

after

ROW<...>

变更后的行值(删除时为 NULL)

beforeafter 列是嵌套的 ROW 类型,包含基础表的所有列。

变更类型

变更类型

描述

before

after

insert

新插入了一行

NULL

包含新行的值

update

更新了一行

包含旧行的值

包含新行的值

delete

删除了一行

包含被删除行的值

NULL

示例

-- 创建主键表
CREATE TABLE users (
    user_id INT NOT NULL,
    name STRING,
    email STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');

-- 插入、更新,然后删除一条记录
INSERT INTO users VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO users VALUES (1, 'Alice Smith', 'alice.smith@example.com');
DELETE FROM users WHERE user_id = 1;

-- 查询 binlog
SELECT * FROM users$binlog;

输出:

+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| _change_type | _log_offset | _commit_timestamp   | before                           | after                                |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| insert       |           0 | 2024-01-15 10:30:00 | NULL                             | (1, Alice, alice@example.com)        |
| update       |           2 | 2024-01-15 10:35:00 | (1, Alice, alice@example.com)    | (1, Alice Smith, alice.smith@example.com) |
| delete       |           3 | 2024-01-15 10:40:00 | (1, Alice Smith, alice.smith@example.com) | NULL                        |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+

访问嵌套字段

可以从 beforeafter 结构中访问单个字段:

SELECT
    _change_type,
    _commit_timestamp,
    `before`.name AS old_name,
    `after`.name AS new_name
FROM users$binlog
WHERE _change_type = 'update';

启动模式

模式

描述

earliest

从日志的起始位置开始读取

latest

从日志的当前末尾开始读取(仅读取新变更)

timestamp

从指定的时间戳开始读取(自 epoch 以来的毫秒数)

Binlog 表支持不同的启动模式来控制读取的起始位置:

-- 从起始位置读取(默认)
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

-- 仅读取从现在开始的新变更
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- 从指定时间戳读取
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;

限制

  • 目前尚不支持投影下推、分区下推和谓词下推,将在未来版本中解决。