This topic explains how to read data from Fluss in stream mode and batch mode.
Read data
Stream mode
Fluss supports multiple consumption modes. For a primary key table, the default consumption mode is full, which first consumes the full data and then the incremental data.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the left-side navigation pane, click .
Click + and then click New Stream Job. Enter a File Name, select an Engine Version, and then click Create.
Write the job code.
SELECT * FROM `fluss-catalog`.`my_db`.`my_table`; # Use the earliest consumption mode SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
Batch mode
Both primary key tables and log tables support
LIMITreads to quickly return a specified number of results, simplifying data previews.Both primary key tables and log tables (version 0.9-ali-1.0 and later) support
COUNT(*)aggregate queries.Primary key tables also support point lookups by using exact matches of primary and index keys.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the left-side navigation pane, click .
Click + and then click New Batch Job. Enter a File Name, select an Engine Version and SQL Dialect, and then click Create.
Write the job code.
# LIMIT read SELECT * FROM `fluss-catalog`.`my_db`.`my_table` LIMIT 10; # Point lookup (primary key tables only) SELECT * FROM `fluss-catalog`.`my_db`.`my_pk_tbl` WHERE shop_id = 10000 and user_id = 123456; # Aggregate query SELECT count(*) FROM `fluss-catalog`.`my_db`.`my_log_table`;
Query optimization
Predicate pushdown
For filter conditions written in a SELECT ... WHERE clause, Flink automatically pushes the corresponding predicates down to the Fluss server. The server then evaluates these predicates against the column statistics of each RecordBatch, which allows it to skip non-matching data blocks and reduce network transfer and deserialization overhead.
Filter operators
Comparison operators:
=,<>,>,>=,<, and<=IN (...)IS NULL,IS NOT NULLBETWEEN ... AND ...LIKE: Supports prefix matching ('abc%'), suffix matching ('%abc'), and contains matching ('%abc%').Logical operators:
AND,OR
Limitations
Applies only to log tables.
All columns referenced in the filter condition must have column statistics enabled (
table.statistics.columns).If statistics are not enabled for any referenced column, the filter condition is not pushed down.
This coarse-grained filtering is based on batch-level statistics and may result in false positives (returning extra data). This does not affect the semantics of the query results.
Examples
Step 1: Create a table and enable column statistics
CREATE TABLE sensor_log (
sensor_id BIGINT,
temperature DOUBLE,
humidity DOUBLE,
event_time TIMESTAMP(3)
) WITH (
'table.statistics.columns' = 'sensor_id,temperature'
);Step 2: Query data by using a WHERE clause
SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;Step 3: Verify predicate pushdown by using EXPLAIN
EXPLAIN SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;In the EXPLAIN output, the filter=[...] clause of the TableSourceScan node indicates that the filter condition has been pushed down to the Fluss server.
It is expected behavior for the Calc node in the EXPLAIN output to retain the filter condition. The Flink client filters the received data again as a safeguard.
For more details on the principles and configuration of predicate pushdown, see Table Properties.
Data query
Both primary key tables and log tables support
LIMITreads to quickly return a specified number of results, simplifying data previews.Both primary key tables and log tables (version 0.9-ali-1.0 and later) support
COUNT(*)aggregate queries.Primary key tables also support point lookups by using exact matches of primary and index keys.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the left-side navigation pane, click .
SELECT * FROM `fluss-catalog`.`my_db`.`customer` WHERE c_custkey = 1;Select the code block and click Run to view the results.
Consumption modes
Consumption mode | Primary key table | Log table | Use cases |
full (Default) | First consumes the full data, and then the incremental data. | Consumes data from the earliest offset. | Ideal for scenarios that require processing all data, such as full data analysis. |
earliest | Consumes data starting from the earliest binlog offset. | Consumes data from the earliest offset. | Ideal for scenarios that require processing historical change data, such as initial data loading. |
latest | Consumes data starting from the latest binlog offset. | Consumes data from the latest offset. | Ideal for scenarios where you only need the most recent data, such as real-time monitoring. |
timestamp | Consumes binlog data starting from a specific time specified by the | Consumes data starting from a specific time specified by the | Ideal for data recovery or backtracking based on a specific point in time. |
SQL examples
// full mode (default)
SELECT * FROM my_table ;
// earliest consumption mode
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
// latest consumption mode
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;
// Use a timestamp
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1678883047356') */;
// Use a time string
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '2023-12-09 23:09:12') */;