This topic introduces primary key tables in Fluss.
Key concepts
A primary key table in Fluss enforces uniqueness on a specified primary key and supports INSERT, UPDATE, and DELETE operations.
For example, the following Flink SQL statement creates a primary key table that defines shop_id, user_id, and ds as the primary key and distributes data into four buckets.
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
ds STRING,
PRIMARY KEY (shop_id,user_id,ds) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);When you write multiple records with the same primary key, the system retains the last record by default, based on processing time. For more write strategies, see Merge engines.
For a partitioned table, the primary key must include all partition key fields. For example, if the partition key is
ds, thendsmust be part of the primary key. Because data is partitioned by the partition key, Fluss maintains uniqueness within each partition. This prevents records with the same primary key from existing in different partitions.
Data bucketing
For a primary key table, Fluss assigns records to a bucket based on the hash value of their bucket key.
The bucket key must be a subset of the primary key, excluding any partition keys.
For example, if the primary key is (shop_id, user_id, ds) and the partition key is ds, the bucket key can be
'bucket.key' = 'user_id,shop_id','bucket.key' = 'user_id', or'bucket.key' = 'shop_id'.If you do not specify a bucket key, Fluss uses the primary key columns (excluding any partition keys) as the default bucket key.
For example, if the primary key is (shop_id, user_id, ds) and the partition key is ds, and no bucket key is specified, the default bucket key is
'bucket.key' = 'user_id,shop_id'.Records with the same hash value are assigned to the same bucket, which facilitates parallel processing and load balancing.
For more details, see Data bucketing.
Partial update
Fluss supports partial updates for primary key tables. You can write a subset of columns to incrementally update a record. These partial updates are eventually merged to form a complete row.
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 STRING,
PRIMARY KEY (k) NOT ENFORCED
);You can write partial columns in separate operations, but each write must include the primary key column, as shown in the following figure.
Merge engines
The Fluss Merge Engine is a core component for efficiently processing data updates in primary key tables. It allows you to define how new records are merged with existing ones that share the same primary key. You can also specify different merge engines to customize merge behavior based on your use case.
The following merge engines are supported:
When a new record has the same primary key as an existing record, this engine keeps the latest record (based on processing time, i.e., the last one written).
Unlike the default strategy, this engine keeps the earliest record (based on processing time, i.e., the first one written).
This is useful for scenarios where you need to preserve the initial state of the data.
This engine merges records based on a version number.
You can decide which version of a record to keep based on a version field.
This is ideal for scenarios that require data versioning or incremental updates.
Groups records by the same primary key and applies a specified aggregate function to each non-primary key field.
This engine covers many common aggregate functions, simplifying data aggregation scenarios.
Changelog generation
Fluss records data insertions, deletions, and updates in a primary key table as a changelog. Downstream consumers can directly read this changelog to track table changes.
For example, if you first insert two records (1, 2.0, 'apple') and (1, 4.0, 'banana') into a Fluss primary key table, and then delete the record (1, 4.0, 'banana'), the following changelog data is generated:
+I(1, 2.0, 'apple')
-U(1, 2.0, 'apple')
+U(1, 4.0, 'banana')
-D(1, 4.0, 'banana')Local storage and partition lifecycle
The storage structure of a primary key table consists of LogTablet for the retraction stream and KvTablet for state lookups. Because these components use different mechanisms, relying on a standard TTL alone cannot fully reclaim local disk space.
LogTablet management: The
table.log.ttlparameter controls only the retention period of the changelog and does not affect the storage size of KvTablet.KvTablet management (Recommended): To completely remove expired data and reclaim the local disk space occupied by KvTablet, you must enable Partition TTL.
Enable auto-partition: Set
table.auto-partition.enabled = true.Set retention policy: Configure
table.auto-partition.num-retentionto specify the number of partitions to retain.Execution logic: The system periodically checks for and physically deletes old partitions that exceed the retention count, effectively reclaiming local storage space.
Auto-increment column
An auto-increment column automatically generates a unique, increasing numeric value for each row, so you do not have to specify one manually. Typical use cases include:
Accelerate distinct counting: Map STRING type identifiers to auto-incrementing integers. Performing a distinct count on integer values can improve query speed severalfold.
RoaringBitmap aggregation: Use the built-in
rbm32(32-bit) andrbm64(64-bit) functions in the Aggregation Merge Engine to aggregate auto-incrementing integer IDs into a RoaringBitmap for highly efficient distinct counting.
CREATE TABLE uid_mapping (
user_id STRING,
uid BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'auto-increment.fields' = 'uid',
'bucket.num' = '2'
);
Basic features
Uniqueness: Values generated by an auto-increment column are unique across the entire table.
Monotonicity: Each table bucket caches a batch of auto-incrementing IDs on the TabletServer. As a result, values are not guaranteed to be strictly monotonic globally, only approximately increasing over time.
The cache size is controlled by the
table.auto-increment.cache-sizetable property, with a default value of 100,000. A larger cache improves allocation performance but reduces monotonicity. This property cannot be changed after the table is created.
Example
table.auto-increment.cache-size = 100000(default)The table has 2 buckets. When the system starts, it pre-allocates an ID range to each bucket:
Bucket 0 →
[1, 100000]Bucket 1 →
[100001, 200000]
If data is written to bucket 1, the uid starts from 100001. After bucket 0 uses up its 100,000 IDs, it requests the next batch,[200001, 300000], and so on. There is no limit to the total number of IDs that each bucket can allocate.100000is not an upper limit; it is the cache batch size.
Use cases
You can use an auto-increment column with the lookup.insert-if-not-exists option in a Flink Lookup Join to automatically build a dictionary table during stream processing. When a lookup key is not found, Fluss automatically inserts a new row and assigns a unique, auto-incrementing ID.
A typical use case is to map high-cardinality string identifiers to compact integer IDs for subsequent and efficient distinct-count aggregation using RoaringBitmap. For more details, see dimension table join.
Limitations
Only supported for primary key tables.
You cannot explicitly specify a value for an auto-increment column; it must be implicitly assigned by the system.
A table can have only one auto-increment column.
The data type of an auto-increment column must be
INTorBIGINT.You cannot specify a starting value or step for an auto-increment column.
Build a dictionary table with a Lookup Join
By combining an auto-increment column with the lookup.insert-if-not-exists option of a Flink Lookup Join, you can automatically build a dictionary table during stream processing. When a lookup key is not found, Fluss automatically inserts a new row and assigns an auto-incrementing ID. This is particularly useful for mapping high-cardinality string identifiers to compact integer IDs for efficient deduplication and aggregation using RoaringBitmap. For more information and examples, see dimension table join.
Reading and querying primary key tables
Data cleanup: The Fluss service only manages its own storage resources and does not perform data cleanup or garbage collection operations at the Paimon storage layer.
Real-time query limitations: Fluss provides query services only for data within its configured lifecycle. Once data exceeds its retention period, the system removes it from the Fluss layer, and users cannot retrieve it through the Fluss interface.
Historical data access: In the future, you will be able to query expired partition data through Paimon's Union Read mechanism to access the complete historical dataset.
Reading
For primary key tables, the default read mode consists of a full snapshot followed by incremental data:
First, consume the table's snapshot data.
Then, consume the table's changelog data.
You can also choose to consume only the changelog data. For more details, see Reading Data.
Primary key lookup
Fluss primary key tables support lookups by primary key. If the specified primary key exists in Fluss, the lookup returns a single, unique row. This lookup method is typically used in Flink Lookup Join scenarios.
Prefix lookup
Fluss primary key tables also support prefix lookups based on a prefix of the primary key. Unlike a primary key lookup, a prefix lookup scans data based on the key prefix and may return multiple rows. This lookup method is typically used in Flink Prefix Lookup Joinscenarios.