This topic describes the parameters for the Fluss connector.
How to configure parameters
Configure parameters during table creation
Fluss persists parameters defined during table creation as table metadata. These parameters apply to all read and write operations.
# Set table.log.ttl to 7 days and disable CRC checks during reads
CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'table.log.ttl' = '7d',
'client.scanner.log.check-crc' = 'false'
);Dynamic configuration with SQL hints
Fluss also lets you dynamically configure parameters using SQL Hints. These parameters are not persisted in the table metadata and affect only the current query.
Dynamically configured parameters have higher priority over parameters persisted in table metadata.
Storage parameters do not support dynamic configuration using SQL Hints.
-- Set SQL hints for a read operation
SELECT * FROM `my-catalog`.`my_db`.`my_log_table` /*+ OPTIONS('client.scanner.log.check-crc' = 'false') */;
-- Set SQL hints for a write operation
INSERT INTO `my-catalog`.`my_db`.`my_pk_table_2` /*+ OPTIONS('sink.ignore-delete'='true') */ select * from `my-catalog`.`my_db`.`my_pk_table_1`;Storage parameters
Parameter | Type | Default | Description |
bucket.num | int | 1 | The number of buckets in the Fluss cluster. |
bucket.key | String | None | Data will be distributed among the buckets based on the hash value of
|
table.log.ttl | Duration | 7d | The time-to-live (TTL) for log data, which controls the maximum retention period. After this period, Fluss deletes old log data to free up space. A value of -1 disables expiration, so logs are never deleted. |
table.auto-partition.enabled | Boolean | false | If enabled, Fluss automatically creates partitions for the table. |
table.auto-partition.time-unit | ENUM | DAY | The time granularity for automatic partition creation. Valid values are For example: If the value is |
table.auto-partition.num-precreate | Integer | 2 | The number of future partitions to pre-create. For example, if the current date is 2024-11-11 and this is set to 3, Fluss pre-creates partitions for |
table.auto-partition.num-retention | Integer | 7 | The number of historical partitions to retain. For example, if the current date is 2024-11-11, the time unit is |
table.auto-partition.time-zone | String | System time zone | The time zone for auto-partitioning. By default, it uses the system time zone. |
table.replication.factor | Integer | (None) | The replication factor for log tables. If not set, Fluss uses the cluster's default replication factor, |
table.log.format | Enum | ARROW | The storage format for logs. Supported formats are |
table.log.arrow.compression.type | Enum | ZSTD | If the log format is set to ARROW, this parameter specifies the compression type for log data. Supported types are NONE, LZ4_FRAME, and ZSTD. The default value is ZSTD. |
table.log.arrow.compression.zstd.level | Integer | 3 | If the log format is set to ARROW and the compression type is ZSTD, this parameter specifies the compression level. The valid range is from 1 to 22. The default value is 3. |
table.statistics.columns | String | (None) | Specifies a comma-separated list of column names for which to collect statistics, such as Unsupported types (BYTES, BINARY, ARRAY, MAP, ROW) are automatically excluded. For more information, see Predicate Pushdown. |
table.kv.format | Enum | COMPACTED | The storage format for KV data. Supported formats are |
table.log.tiered.local-segments | Integer | 2 | When tiered log storage is enabled, this parameter specifies the number of log segments to retain locally for each table. The value must be greater than 0. The default is 2. |
table.datalake.enabled | Boolean | false | Specifies whether to enable lakehouse storage for the table. This is disabled by default. If enabled and the lakehouse tiering service is active, Fluss automatically tiers and compresses table data into lakehouse storage. |
table.datalake.format | Enum | (None) | Specifies the data lake format that the table uses for layered lakehouse storage. Currently, only The core functions of this parameter are:
Configuration Behavior
|
table.merge-engine | Enum | (None) | Defines the merge engine for primary key tables. The default strategy is
|
table.merge-engine.versioned.ver-column | String | (None) | If the merge engine is set to |
Read parameters
Parameter | Type | Default | Description |
scan.startup.mode | Enum | full | The connector's startup mode, which specifies where to start consuming data. Supported modes are |
scan.startup.timestamp | Long | (None) | The timestamp to start reading from. This option applies only when |
scan.partition.discovery.interval | Duration | 10s | The interval for the Fluss source to check for new partitions. The source dynamically adds new partitions to the read plan and unsubscribes from deleted ones. A negative value disables automatic partition discovery. |
client.scanner.log.check-crc | Boolean | true | Specifies whether to perform a CRC32 check on messages that are read. This check verifies that messages are not corrupted in transit or on disk, improving data reliability. Although this adds minor computational overhead, we recommend enabling it in production to ensure data integrity. |
client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned by a single |
client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum data size per fetch request. If the first record batch in the first non-empty bucket exceeds this limit, the client still returns it to ensure progress. This value is therefore a soft limit, not an absolute one. |
client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data returned per table bucket in a single fetch request. Records are fetched in batches, and the maximum byte size is configured by this option. |
client.scanner.log.fetch.min-bytes | MemorySize | 1b | The minimum amount of data in bytes the client expects from the server in response to each log fetch request. If the data size is insufficient, the client waits for a maximum of |
client.scanner.log.fetch.wait-max-time | Duration | 500ms | The maximum time the client waits for a response from the server for each log fetch request. |
client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | The local directory where the client temporarily stores data files, such as KV snapshots and log segment files, for reading. |
client.scanner.remote-log.prefetch-num | Integer | 4 | The number of remote log segments downloaded from remote storage by the LogScanner. These log segments are saved in local temporary files. The default value is 4. |
client.remote-file.download-thread-num | Integer | 3 | The number of threads the client uses to download remote files. |
Write parameters
Parameter | Type | Default | Description |
sink.ignore-delete | Boolean | false | When |
sink.bucket-shuffle | Boolean | true | Specifies whether to shuffle data by bucket ID before writing to the sink. Shuffling to group records with the same bucket ID into the same task improves client-side efficiency and reduces resource usage. For log tables, this requires a |
client.writer.buffer.memory-size | MemorySize | 64mb | The total number of bytes that the writer can use to buffer internal rows. |
client.writer.batch-size | MemorySize | 2mb | The size of a batch for a single bucket. The writer groups records for the same bucket into batches, which improves both client and server performance. |
client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer blocks when waiting for a segment to become available. |
client.writer.batch-timeout | Duration | 100ms | To improve throughput, the writer can add a small artificial delay to wait for more records to arrive before sending a batch. This is similar to Nagle's algorithm in TCP and sets an upper bound on send latency. A batch is sent immediately when it reaches |
client.writer.bucket.no-key-assigner | Enum | STICKY | Specifies the bucket assignment strategy for keyless tables. For tables with a bucket key or primary key, the system selects a bucket based on the key's hash value. For keyless tables, you can select the bucket assigner with this parameter. The following strategies are supported:
|
client.writer.acks | String | all | Valid values:
Note In production environments, use |
client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting limits the number of record batches the writer can send in a single request to avoid sending overly large requests. |
client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero causes the client to resend any record that fails with a potentially transient error. |
client.writer.enable-idempotence | Boolean | true | Specifies whether to enable idempotence, which is enabled by default. When enabled, the writer ensures exactly-once semantics, which prevents duplicates and preserves ordering. If disabled, client retries due to server failures can result in duplicate messages. |
client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket. This setting applies only when |
Dimension table parameters
Parameter | Type | Default | Description |
lookup.async | Boolean | true | Specifies whether to use asynchronous lookup. Asynchronous lookup provides better throughput than synchronous lookup. |
| Boolean | false | When |
lookup.cache | Enum | NONE | The caching policy for this lookup table. Supported values are |
lookup.max-retries | Integer | 3 | The maximum number of retries allowed when a lookup operation fails. |
lookup.partial-cache.expire-after-access | Duration | (None) | The expiration time for a cache entry after it has been accessed. |
lookup.partial-cache.expire-after-write | Duration | (None) | The expiration time for a cache entry after it has been written. |
lookup.partial-cache.cache-missing-key | Boolean | true | If |
lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. |
client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. |
client.lookup.max-batch-size | Integer | 128 | The maximum number of lookups to batch into a single request. |
client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for a lookup operation. |
client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for a lookup batch to fill. If this timeout is reached, the lookup batch is closed and sent. |
Other parameters
Parameter | Type | Default | Description |
bootstrap.servers | List | (None) | Specify the initial list of nodes used to connect to the Fluss cluster, in the format The client uses this list to discover the full cluster topology, so it does not need to include all servers. We recommend configuring multiple nodes for high availability to avoid a single point of failure. |
client.id | String | "" | An ID string to pass to the server with each request. Using a meaningful ID (e.g., |
client.connect-timeout | Duration | 120s | The connection timeout for the Netty client. |
client.request-timeout | Duration | 30s | The timeout for completing a request. When |
client.filesystem.security.token.renewal.backoff | Duration | 1h | The time interval to wait before retrying to fetch a new security token for the file system after a fetch failure. |
client.filesystem.security.token.renewal.time-ratio | Double | 0.75 | A ratio of the token's lifetime that determines when to renew it. For example, a ratio of 0.75 means renewal is attempted after 75% of the token's validity period has passed. |
client.metrics.enabled | Boolean | false | Enables client metrics. When enabled, the client collects and reports metrics through a JMX metrics reporter. |