This topic introduces virtual tables in Fluss.
Key concepts
Virtual tables in Fluss are system-generated tables that provide access to metadata, change data, and other system information without requiring additional storage. You can access a virtual table by appending a suffix, such as $changelog, to a base table name.
Fluss supports the following virtual table types:
|
Virtual table |
Suffix |
Description |
Supported table types |
|
|
Provides access to the raw changelog stream with metadata. |
Primary key tables, log tables |
|
|
|
Provides binlog-formatted data, including the before and after images of changes. |
Primary key tables only |
Changelog table
The $changelog virtual table provides read-only access to a table's raw changelog stream, allowing you to audit and process all data changes and their associated metadata.
Append $changelog to a table name to access its changelog:
SELECT * FROM my_table$changelog;
Table schema
The changelog virtual table contains three metadata columns, followed by the columns of the base table:
|
Parameter |
Type |
Description |
|
|
STRING NOT NULL |
The type of the change operation (see Change types). |
|
|
BIGINT NOT NULL |
The offset position in the log. |
|
|
TIMESTAMP_LTZ(3) NOT NULL |
The timestamp when the change was committed. |
Change types
The _change_type column indicates the type of data change:
Primary key tables
|
Change type |
Description |
|
|
A new row was inserted. |
|
|
The row values before the update (retraction). |
|
|
The row values after the update. |
|
|
A row was deleted. |
Log tables
For append-only log tables, only one change type is used:
|
Change type |
Description |
|
|
A new row was appended to the log. |
Example
Consider a primary key table that tracks user orders:
-- Create a primary key table
CREATE TABLE orders (
order_id INT NOT NULL,
customer_name STRING,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- Insert a record
INSERT INTO orders VALUES (1, 'Rhea', 100.00);
-- Update the record
INSERT INTO orders VALUES (1, 'Rhea', 150.00);
-- Delete the record
DELETE FROM orders WHERE order_id = 1;
-- Query the changelog
SELECT * FROM orders$changelog;
Output:
+---------------+-------------+---------------------+----------+---------------+---------+
| _change_type | _log_offset | _commit_timestamp | order_id | customer_name | amount |
+---------------+-------------+---------------------+----------+---------------+---------+
| insert | 0 | 2024-01-15 10:30:00 | 1 | Rhea | 100.00 |
| update_before | 1 | 2024-01-15 10:35:00 | 1 | Rhea | 100.00 |
| update_after | 2 | 2024-01-15 10:35:00 | 1 | Rhea | 150.00 |
| delete | 3 | 2024-01-15 10:40:00 | 1 | Rhea | 150.00 |
+---------------+-------------+---------------------+----------+---------------+---------+
Startup modes
|
Mode |
Description |
|
|
Starts reading from the beginning of the log. |
|
|
Starts reading from the current end of the log (processes new changes only). |
|
|
Starts reading from a specific timestamp (in milliseconds since the epoch). |
The changelog table supports different startup modes to control the starting position for reading data:
-- Read from the earliest offset (default)
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- Read new changes only
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- Read from a specific timestamp
SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;
Limitations
-
Projection pushdown, partition pushdown, and predicate pushdown are not currently supported. This will be addressed in a future release.
Binlog table
The $binlog virtual table provides access to change data, where each record contains the before image and the after image of a row.
The $binlog virtual table only applies to primary key tables.
Binlog tables require Realtime Compute for Apache Flink engine VVR 11.6 or later.
Append $binlog to the primary key table name to access the binlog:
SELECT * FROM my_pk_table$binlog;
Table schema
The Binlog virtual table contains three metadata columns, followed by a nested before and after row structure:
|
Parameter |
Type |
Description |
|
|
STRING NOT NULL |
Type of change operation: |
|
|
BIGINT NOT NULL |
The offset position in the log. |
|
|
TIMESTAMP_LTZ(3) NOT NULL |
The timestamp when the change was committed. |
|
|
ROW<...> |
The row values before the change (NULL for inserts). |
|
|
ROW<...> |
The row values after the change (NULL for deletes). |
The before and after columns are of the nested ROW type and contain all columns of the base table.
Change types
|
Change type |
Description |
|
|
|
|
A new row was inserted. |
NULL |
Contains the values of the new row. |
|
|
A row was updated. |
Contains the values of the old row. |
Contains the values of the new row. |
|
|
A row was deleted. |
Contains the values of the deleted row. |
NULL |
Example
-- Create a primary key table
CREATE TABLE users (
user_id INT NOT NULL,
name STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
-- Insert, update, and then delete a record
INSERT INTO users VALUES (1, 'Alice', 'alice@example.com');
INSERT INTO users VALUES (1, 'Alice Smith', 'alice.smith@example.com');
DELETE FROM users WHERE user_id = 1;
-- Query the binlog
SELECT * FROM users$binlog;
Output:
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| _change_type | _log_offset | _commit_timestamp | before | after |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
| insert | 0 | 2024-01-15 10:30:00 | NULL | (1, Alice, alice@example.com) |
| update | 2 | 2024-01-15 10:35:00 | (1, Alice, alice@example.com) | (1, Alice Smith, alice.smith@example.com) |
| delete | 3 | 2024-01-15 10:40:00 | (1, Alice Smith, alice.smith@example.com) | NULL |
+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
Accessing nested fields
You can access individual fields from the before and after structures:
SELECT
_change_type,
_commit_timestamp,
`before`.name AS old_name,
`after`.name AS new_name
FROM users$binlog
WHERE _change_type = 'update';
Startup modes
|
Mode |
Description |
|
|
Starts reading from the beginning of the log. |
|
|
Starts reading from the current end of the log (processes new changes only). |
|
|
Starts reading from a specific timestamp (in milliseconds since the epoch). |
The binlog table supports different startup modes to control the starting position for reading data:
-- Read from the earliest offset (default)
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
-- Read new changes only
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- Read from a specific timestamp
SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */;
Limitations
-
Projection pushdown, partition pushdown, and predicate pushdown are not currently supported. This will be addressed in a future release.
-
Binlog tables support streaming reads only, not batch reads.