Table features

更新时间:
复制 MD 格式

This topic describes the table features, including remote storage, column pruning, predicate pushdown, and log compression.

Overview

  • remote storage: To ensure low latency, the most recent data is retained on local disks. Older data is asynchronously migrated to remote storage to reduce local storage costs.

  • column pruning: During a query, only required columns are read and irrelevant ones are skipped, which reduces the amount of data read.

  • predicate pushdown: The server uses column statistics to skip data blocks that do not match filters. This reduces network I/O and deserialization overhead.

  • log compression: This feature uses the native Arrow compression mechanism to significantly reduce network transfer and storage costs.

Remote storage

Remote storage refers to a more cost-effective and reliable external storage system, such as Amazon S3, Hadoop Distributed File System (HDFS), or Alibaba Cloud OSS. By default, Fluss uses Alibaba Cloud OSS as its remote storage.

  • Fluss uses remote storage for data tiering and fault tolerance.

  • For log tables, remote storage stores tiered log segments.

  • For primary key tables, remote storage stores snapshots and change log data segments.

Remote log mechanism

Fluss is a stream storage system where data is typically consumed by reading from the tail. To ensure low latency, the most recent data is kept on local disks. Fluss asynchronously migrates older data to remote storage to reduce local storage costs.

Table-level configuration

After log segments are copied to remote storage, Fluss usually deletes the local copies to save space. However, to improve read performance, you can choose to retain a specific number of recent log segments locally.

Parameter

Type

Default

Description

table.log.tiered.local-segments

Integer

2

Specifies the number of log segments to retain locally.

Cluster-level configuration

The following configuration options control the log tiering behavior:

Parameter

Type

Default

Description

remote.log.task-interval-duration

Duration

1 minute

The execution interval for remote log management tasks, which include copy, cleanup, and delete operations. Setting this to 0 seconds disables the remote log feature.

remote.log.index-file-cache-size

Memory size

1 GB

The total size of the local cache for storing remote index files.

remote.log-manager.thread-pool-size

Integer

4

The size of the thread pool for scheduling remote log tasks.

remote.log.data-transfer-thread-num

Integer

4

The number of threads for uploading and downloading remote log files, including data, index, and metadata files.

log.segment.file-size

Memory size

1 GB

Controls the size of each log segment file. Log retention and cleanup operate at the file level, so larger files result in fewer total files.

This option, together with the table-level table.log.tiered.local-segments option, determines the total local disk space usage. The default value for this option is 1 GB.

Remote snapshots for primary key tables

  • Fluss distributes data in a primary key table across buckets. Each bucket maintains only one local replica and has no follower replica.

  • To prevent data loss from local disk failures, Fluss periodically creates snapshots and uploads them to remote storage.

  • A snapshot contains a copy of the data and a log offset, which indicates the position of the next change to read after creating the snapshot.

  • If a node fails, Fluss can download the snapshot from remote storage and replay the subsequent change log to recover the data on another node.

  • The consistency between the snapshot and the log offset allows clients to seamlessly switch from a full read (from a snapshot) to an incremental subscription (from the change log), preventing data duplication or loss.

Cluster-level snapshot configuration

Parameter

Type

Default

Description

kv.snapshot.interval

Duration

10 minutes

The snapshot creation interval for primary key tables.

kv.snapshot.scheduler-thread-num

Integer

1

The number of threads for scheduling all snapshot tasks on the local node.

kv.snapshot.transfer-thread-num

Integer

4

The number of threads for uploading and downloading snapshot files.

kv.snapshot.num-retained

Integer

1

The maximum number of completed snapshots to retain. We recommend that you set this to a large value to prevent deleting a snapshot while a client is reading it.

Column pruning

Column pruning is an optimization that reduces the amount of data read by only reading the required columns and skipping irrelevant ones during a query. Fluss supports column pruning for stream reads, making it one of the few systems in the industry with this capability for real-time stream processing scenarios.

  • Fluss supports column pruning for both log tables and the change logs of primary key tables. This feature reduces storage I/O, decreases network traffic, and lessens the parsing load on the client, significantly improving query performance.

  • For both batch processing and stream processing, the system always processes only the necessary fields, which maximizes resource utilization.

When a compute engine, such as Flink, executes a query, it analyzes the required columns and instructs Fluss to return only those columns.

SELECT id, name FROM log_table WHERE timestamp > '2023-01-01'

This query reads only the id, name, and timestamp columns. Fluss does not read other columns in the table, such as address and status, from storage, which avoids unnecessary reads.

Predicate pushdown

Predicate pushdown is a server-side optimization that applies only to log tables. The server evaluates filter predicates against the column statistics (min, max, and nullCount) of each RecordBatch. It then skips entire batches that cannot contain matching rows, which reduces network I/O and deserialization overhead.

Column statistics

Fluss collects column statistics at the RecordBatch level when data is written. These statistics include the minimum (min), maximum (max), and null count (nullCount) for each column. Fluss stores these statistics within the Arrow RecordBatch, allowing the server to evaluate filter predicates during reads.

Configuration

Use the table.statistics.columns parameter to specify the columns for statistics collection. We recommend that you specify column names explicitly. Alternatively, you can set the value to * to collect statistics for all supported column types.

Enable when creating a table

CREATE TABLE log_table (
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) WITH (
  'table.statistics.columns' = 'order_id,amount'
);

Enable for an existing table

ALTER TABLE log_table SET ('table.statistics.columns' = 'order_id,amount');
Important

This feature applies only to data written after you enable statistics collection. It does not affect historical data.

Supported types

Column statistics are supported for the following data types: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, CHAR, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

The following data types do not support statistics and are automatically excluded from the configuration: BYTES, BINARY, ARRAY, MAP, and ROW.

Prerequisites

  • The log format must be Arrow (table.log.format = 'arrow'). This is enabled by default.

  • You must explicitly enable column statistics by using the table.statistics.columns option.

  • This feature applies only to log tables.

How it works

When reading data, the server evaluates the column statistics and filter predicates for each RecordBatch. If the statistics indicate that a batch cannot contain any matching rows, the server skips that batch and does not send it to the client. The Flink client still re-applies the filters to the received data as a safeguard, ensuring that the query semantics remain unchanged.

Log compression

Fluss uses the native Arrow compression mechanism. The compressed data remains compliant with the Arrow format standard. Fluss compresses each column independently, which allows column pruning to function and does not affect query performance.

Compression process

  • For log tables, the writer on the client side compresses data and writes it to storage in a compressed format. When reading the data, the log scanner decompresses it.

  • For the change logs of primary key tables, the server generates and compresses the log.

Benefits of compression

Compression significantly reduces network transfer and storage costs. Benchmark tests show that ZSTD at compression level 3 can achieve a compression ratio of approximately 5:1 (for example, compressing 5 GB of data down to 1 GB). The reduction in network overhead also increases read and write throughput.

Configuration

Parameter

Default

Description

table.log.arrow.compression.type

ZSTD

The compression algorithm. Valid values:

  • NONE

  • LZ4_FRAME

  • ZSTD

table.log.arrow.compression.zstd.level

3

The ZSTD compression level.

Valid values range from 1 to 22.

Configuration example

-- Set the compression algorithm to ZSTD and the compression level to 2
CREATE TABLE log_table (
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) WITH (
  'table.log.arrow.compression.type' = 'ZSTD',
  'table.log.arrow.compression.zstd.level' = '2'
);
Important
  • Compression settings apply only to tables where table.log.format='arrow'.

  • If table.log.format is set to 'indexed', Fluss ignores the compression settings.