Virtual tables

更新时间:
复制 MD 格式

This topic introduces virtual tables in Fluss.

Key concepts

Virtual tables in Fluss are system-generated tables that provide access to metadata, change data, and other system information without requiring additional storage. You can access a virtual table by appending a suffix, such as $changelog, to a base table name.

Fluss supports the following virtual table types:

Virtual table

Suffix

Description

Supported table types

Changelog table

$changelog

Provides access to the raw changelog stream with metadata.

Primary key tables, log tables

Binlog table

$binlog

Provides binlog-formatted data, including the before and after images of changes.

Primary key tables only

Changelog table

The $changelog virtual table provides read-only access to a table's raw changelog stream, allowing you to audit and process all data changes and their associated metadata.

Append $changelog to a table name to access its changelog:

SELECT * FROM my_table$changelog;

Table schema

The changelog virtual table contains three metadata columns, followed by the columns of the base table:

Parameter

Type

Description

_change_type

STRING NOT NULL

The type of the change operation (see Change types).

_log_offset

BIGINT NOT NULL

The offset position in the log.

_commit_timestamp

TIMESTAMP_LTZ(3) NOT NULL

The timestamp when the change was committed.

Change types

The _change_type column indicates the type of data change:

Primary key tables

Change type

Description

insert

A new row was inserted.

update_before

The row values before the update (retraction).

update_after

The row values after the update.

delete

A row was deleted.

Log tables

For append-only log tables, only one change type is used:

Change type

Description

insert

A new row was appended to the log.

Example

Consider a primary key table that tracks user orders:

-- Create a primary key table
CREATE TABLE orders (
    order_id INT NOT NULL,
    customer_name STRING,
    amount DECIMAL(10, 2),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket.num' = '1');

-- Insert a record
INSERT INTO orders VALUES (1, 'Rhea', 100.00);

-- Update the record
INSERT INTO orders VALUES (1, 'Rhea', 150.00);

-- Delete the record
DELETE FROM orders WHERE order_id = 1;

-- Query the changelog
SELECT * FROM orders$changelog;

Output:

+---------------+-------------+---------------------+----------+---------------+---------+
| _change_type  | _log_offset | _commit_timestamp   | order_id | customer_name | amount  |
+---------------+-------------+---------------------+----------+---------------+---------+
| insert        |           0 | 2024-01-15 10:30:00 |        1 | Rhea          |  100.00 |
| update_before |           1 | 2024-01-15 10:35:00 |        1 | Rhea          |  100.00 |
| update_after  |           2 | 2024-01-15 10:35:00 |        1 | Rhea          |  150.00 |
| delete        |           3 | 2024-01-15 10:40:00 |        1 | Rhea          |  150.00 |
+---------------+-------------+---------------------+----------+---------------+---------+

Startup modes

Mode

Description

earliest

Starts reading from the beginning of the log.

latest

Starts reading from the current end of the log (processes new changes only).

timestamp

Starts reading from a specific timestamp (in milliseconds since the epoch).

The changelog table supports different startup modes to control the starting position for reading data:

-- Read from the earliest offset (default)
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

-- Read new changes only
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- Read from a specific timestamp
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;

Limitations

  • Projection pushdown, partition pushdown, and predicate pushdown are not currently supported. This will be addressed in a future release.

Binlog table

The $binlog virtual table provides access to change data, where each record contains the before image and the after image of a row.

Note

The $binlog virtual table only applies to primary key tables.

Note

Binlog tables require Realtime Compute for Apache Flink engine VVR 11.6 or later.

Append $binlog to the primary key table name to access the binlog:

SELECT * FROM my_pk_table$binlog;

Table schema

The Binlog virtual table contains three metadata columns, followed by a nested before and after row structure:

Parameter

Type

Description

_change_type

STRING NOT NULL

Type of change operation: insert, update, or delete

_log_offset

BIGINT NOT NULL

The offset position in the log.

_commit_timestamp

TIMESTAMP_LTZ(3) NOT NULL

The timestamp when the change was committed.

before

ROW<...>

The row values before the change (NULL for inserts).

after

ROW<...>

The row values after the change (NULL for deletes).

The before and after columns are of the nested ROW type and contain all columns of the base table.

Change types

Change type

Description

before

after

insert

A new row was inserted.

NULL

Contains the values of the new row.

update

A row was updated.

Contains the values of the old row.

Contains the values of the new row.

delete

A row was deleted.

Contains the values of the deleted row.

NULL

Example

-- Create a primary key table
CREATE TABLE users (
    user_id INT NOT NULL,
    name STRING,
    email STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');

-- Insert, update, and then delete a record
INSERT INTO users VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO users VALUES (1, 'Alice Smith', 'alice.smith@example.com');
DELETE FROM users WHERE user_id = 1;

-- Query the binlog
SELECT * FROM users$binlog;

Output:

+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| _change_type | _log_offset | _commit_timestamp   | before                           | after                                |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| insert       |           0 | 2024-01-15 10:30:00 | NULL                             | (1, Alice, alice@example.com)        |
| update       |           2 | 2024-01-15 10:35:00 | (1, Alice, alice@example.com)    | (1, Alice Smith, alice.smith@example.com) |
| delete       |           3 | 2024-01-15 10:40:00 | (1, Alice Smith, alice.smith@example.com) | NULL                        |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+

Accessing nested fields

You can access individual fields from the before and after structures:

SELECT
    _change_type,
    _commit_timestamp,
    `before`.name AS old_name,
    `after`.name AS new_name
FROM users$binlog
WHERE _change_type = 'update';

Startup modes

Mode

Description

earliest

Starts reading from the beginning of the log.

latest

Starts reading from the current end of the log (processes new changes only).

timestamp

Starts reading from a specific timestamp (in milliseconds since the epoch).

The binlog table supports different startup modes to control the starting position for reading data:

-- Read from the earliest offset (default)
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

-- Read new changes only
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- Read from a specific timestamp
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;

Limitations

  • Projection pushdown, partition pushdown, and predicate pushdown are not currently supported. This will be addressed in a future release.

  • Binlog tables support streaming reads only, not batch reads.