Read data

更新时间:
复制 MD 格式

This topic explains how to read data from Fluss in stream mode and batch mode.

Read data

Stream mode

Fluss supports multiple consumption modes. For a primary key table, the default consumption mode is full, which first consumes the full data and then the incremental data.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the Actions column of the target workspace, click Console.

  3. In the left-side navigation pane, click Data Development > ETL.

  4. Click + and then click New Stream Job. Enter a File Name, select an Engine Version, and then click Create.

  5. Write the job code.

    SELECT * FROM `fluss-catalog`.`my_db`.`my_table`;
    
    # Use the earliest consumption mode
    SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

Batch mode

  • Both primary key tables and log tables support LIMIT reads to quickly return a specified number of results, simplifying data previews.

  • Both primary key tables and log tables (version 0.9-ali-1.0 and later) support COUNT(*) aggregate queries.

  • Primary key tables also support point lookups by using exact matches of primary and index keys.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the Actions column of the target workspace, click Console.

  3. In the left-side navigation pane, click Data Development > ETL.

  4. Click + and then click New Batch Job. Enter a File Name, select an Engine Version and SQL Dialect, and then click Create.

  5. Write the job code.

    # LIMIT read
    SELECT * FROM `fluss-catalog`.`my_db`.`my_table` LIMIT 10;
    
    # Point lookup (primary key tables only)
    SELECT * FROM `fluss-catalog`.`my_db`.`my_pk_tbl` WHERE shop_id = 10000 and user_id = 123456;
    
    # Aggregate query
    SELECT count(*) FROM `fluss-catalog`.`my_db`.`my_log_table`;

Query optimization

Predicate pushdown

For filter conditions written in a SELECT ... WHERE clause, Flink automatically pushes the corresponding predicates down to the Fluss server. The server then evaluates these predicates against the column statistics of each RecordBatch, which allows it to skip non-matching data blocks and reduce network transfer and deserialization overhead.

Filter operators

  • Comparison operators: =, <>, >, >=, <, and <=

  • IN (...)

  • IS NULL, IS NOT NULL

  • BETWEEN ... AND ...

  • LIKE: Supports prefix matching ('abc%'), suffix matching ('%abc'), and contains matching ('%abc%').

  • Logical operators: AND, OR

Limitations

  • Applies only to log tables.

  • All columns referenced in the filter condition must have column statistics enabled (table.statistics.columns).

  • If statistics are not enabled for any referenced column, the filter condition is not pushed down.

  • This coarse-grained filtering is based on batch-level statistics and may result in false positives (returning extra data). This does not affect the semantics of the query results.

Examples

Step 1: Create a table and enable column statistics

CREATE TABLE sensor_log (
  sensor_id BIGINT,
  temperature DOUBLE,
  humidity DOUBLE,
  event_time TIMESTAMP(3)
) WITH (
  'table.statistics.columns' = 'sensor_id,temperature'
);

Step 2: Query data by using a WHERE clause

SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;

Step 3: Verify predicate pushdown by using EXPLAIN

EXPLAIN SELECT sensor_id, temperature, event_time
FROM sensor_log
WHERE temperature > 30.0 AND sensor_id = 1001;

In the EXPLAIN output, the filter=[...] clause of the TableSourceScan node indicates that the filter condition has been pushed down to the Fluss server.

Note

It is expected behavior for the Calc node in the EXPLAIN output to retain the filter condition. The Flink client filters the received data again as a safeguard.

For more details on the principles and configuration of predicate pushdown, see Table Properties.

Data query

Note
  • Both primary key tables and log tables support LIMIT reads to quickly return a specified number of results, simplifying data previews.

  • Both primary key tables and log tables (version 0.9-ali-1.0 and later) support COUNT(*) aggregate queries.

  • Primary key tables also support point lookups by using exact matches of primary and index keys.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the Actions column of the target workspace, click Console.

  3. In the left-side navigation pane, click Data Query > New Query Script.

    SELECT * FROM `fluss-catalog`.`my_db`.`customer` 
    WHERE c_custkey = 1;
  4. Select the code block and click Run to view the results.

Consumption modes

Consumption mode

Primary key table

Log table

Use cases

full (Default)

First consumes the full data, and then the incremental data.

Consumes data from the earliest offset.

Ideal for scenarios that require processing all data, such as full data analysis.

earliest

Consumes data starting from the earliest binlog offset.

Consumes data from the earliest offset.

Ideal for scenarios that require processing historical change data, such as initial data loading.

latest

Consumes data starting from the latest binlog offset.

Consumes data from the latest offset.

Ideal for scenarios where you only need the most recent data, such as real-time monitoring.

timestamp

Consumes binlog data starting from a specific time specified by the scan.startup.timestamp option.

Consumes data starting from a specific time specified by the scan.startup.timestamp option.

Ideal for data recovery or backtracking based on a specific point in time.

SQL examples

// full mode (default)
SELECT * FROM my_table ;

// earliest consumption mode
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

// latest consumption mode
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;

// Use a timestamp
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1678883047356') */;

// Use a time string
SELECT * FROM my_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '2023-12-09 23:09:12') */;