Connector parameters

更新时间:
复制 MD 格式

This topic describes the parameters for the Fluss connector.

How to configure parameters

Configure parameters during table creation

Fluss persists parameters defined during table creation as table metadata. These parameters apply to all read and write operations.

# Set table.log.ttl to 7 days and disable CRC checks during reads
CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) WITH (
  'table.log.ttl' = '7d',
  'client.scanner.log.check-crc' = 'false'
);

Dynamic configuration with SQL hints

Fluss also lets you dynamically configure parameters using SQL Hints. These parameters are not persisted in the table metadata and affect only the current query.

Important
  • Dynamically configured parameters have higher priority over parameters persisted in table metadata.

  • Storage parameters do not support dynamic configuration using SQL Hints.

-- Set SQL hints for a read operation
SELECT * FROM `my-catalog`.`my_db`.`my_log_table` /*+ OPTIONS('client.scanner.log.check-crc' = 'false') */;

-- Set SQL hints for a write operation
INSERT INTO `my-catalog`.`my_db`.`my_pk_table_2` /*+ OPTIONS('sink.ignore-delete'='true') */ select * from `my-catalog`.`my_db`.`my_pk_table_1`;

Storage parameters

Parameter

Type

Default

Description

bucket.num

int

1

The number of buckets in the Fluss cluster.

bucket.key

String

None

Data will be distributed among the buckets based on the hash value of bucket.key. The bucket.key must be a subset of the primary key and must not include the partition key. If you specify multiple fields as the bucket.key, separate the fields with a comma , .

  • If a table has a primary key and the bucket.key is not explicitly specified, the primary key (excluding the partition key) is used as the bucket.key by default.

  • If a primary key is not defined for the table and bucket.key is not specified, the data will be randomly distributed among the buckets.

table.log.ttl

Duration

7d

The time-to-live (TTL) for log data, which controls the maximum retention period. After this period, Fluss deletes old log data to free up space. A value of -1 disables expiration, so logs are never deleted.

table.auto-partition.enabled

Boolean

false

If enabled, Fluss automatically creates partitions for the table.

table.auto-partition.time-unit

ENUM

DAY

The time granularity for automatic partition creation. Valid values are YEAR, QUARTER, MONTH, DAY, or HOUR.

For example: If the value is HOUR, the format for automatically created partitions is yyyyMMddHH. If the value is DAY, the format is yyyyMMdd.

table.auto-partition.num-precreate

Integer

2

The number of future partitions to pre-create. For example, if the current date is 2024-11-11 and this is set to 3, Fluss pre-creates partitions for 20241111, 20241112, and 20241113, skipping any that already exist. The default is 2. With a time-unit of DAY, this default creates partitions for the current and next day.

table.auto-partition.num-retention

Integer

7

The number of historical partitions to retain. For example, if the current date is 2024-11-11, the time unit is DAY, and this value is 3, Fluss retains partitions for 20241108, 20241109, and 20241110. Fluss deletes partitions older than 20241108. The default is 7, meaning Fluss retains the 7 most recent partitions.

table.auto-partition.time-zone

String

System time zone

The time zone for auto-partitioning. By default, it uses the system time zone.

table.replication.factor

Integer

(None)

The replication factor for log tables. If not set, Fluss uses the cluster's default replication factor, default.replication.factor. The value must be a positive integer and cannot be greater than the number of TabletServers in the Fluss cluster. If the value exceeds the number of TabletServers, table creation fails.

table.log.format

Enum

ARROW

The storage format for logs. Supported formats are ARROW (default) and INDEXED.

table.log.arrow.compression.type

Enum

ZSTD

If the log format is set to ARROW, this parameter specifies the compression type for log data. Supported types are NONE, LZ4_FRAME, and ZSTD. The default value is ZSTD.

table.log.arrow.compression.zstd.level

Integer

3

If the log format is set to ARROW and the compression type is ZSTD, this parameter specifies the compression level. The valid range is from 1 to 22. The default value is 3.

table.statistics.columns

String

(None)

Specifies a comma-separated list of column names for which to collect statistics, such as 'temperature,location'. Set this parameter to * to collect statistics for all supported column types. Column statistics are used for predicate pushdown optimization and are applicable only to log tables that use the Arrow log format. Supported data types include: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, CHAR, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

Unsupported types (BYTES, BINARY, ARRAY, MAP, ROW) are automatically excluded. For more information, see Predicate Pushdown.

table.kv.format

Enum

COMPACTED

The storage format for KV data. Supported formats are COMPACTED (default) and INDEXED.

table.log.tiered.local-segments

Integer

2

When tiered log storage is enabled, this parameter specifies the number of log segments to retain locally for each table. The value must be greater than 0. The default is 2.

table.datalake.enabled

Boolean

false

Specifies whether to enable lakehouse storage for the table. This is disabled by default. If enabled and the lakehouse tiering service is active, Fluss automatically tiers and compresses table data into lakehouse storage.

table.datalake.format

Enum

(None)

Specifies the data lake format that the table uses for layered lakehouse storage. Currently, only paimon is supported.

The core functions of this parameter are:

  • Ensures consistency in key encoding and bucketing strategy between Fluss and the external lakehouse system.

  • Enables unified queries (Union Read) across Fluss and data lake tables.

Configuration Behavior

  • Highest priority: If table.datalake.format is explicitly set during table creation, that value is used.

  • Default value: If not set, the cluster-level datalake.format configuration in Fluss is used.

  • Dynamic activation: You can pre-configure table.datalake.format while table.datalake.enabled is false. You can then activate the data lake functionality by enabling table.datalake.enabled without recreating the table.

table.merge-engine

Enum

(None)

Defines the merge engine for primary key tables. The default strategy is last_row, which keeps the most recent row. Other supported strategies are: 

  • first_row: Keeps the first row for a given primary key.

  • versioned: Keeps the row with the highest version number for a given primary key.

table.merge-engine.versioned.ver-column

String

(None)

If the merge engine is set to versioned, this parameter is required and must specify the name of the version column.

Read parameters

Parameter

Type

Default

Description

scan.startup.mode

Enum

full

The connector's startup mode, which specifies where to start consuming data. Supported modes are full (default), earliest, latest, and timestamp. For details, see Read Data.

scan.startup.timestamp

Long

(None)

The timestamp to start reading from. This option applies only when scan.startup.mode is timestamp. Specify the value as either milliseconds since the epoch (e.g., '1678883047356') or a 'yyyy-MM-dd HH:mm:ss' string (e.g., '2023-12-09 23:09:12').

scan.partition.discovery.interval

Duration

10s

The interval for the Fluss source to check for new partitions. The source dynamically adds new partitions to the read plan and unsubscribes from deleted ones. A negative value disables automatic partition discovery.

client.scanner.log.check-crc

Boolean

true

Specifies whether to perform a CRC32 check on messages that are read. This check verifies that messages are not corrupted in transit or on disk, improving data reliability. Although this adds minor computational overhead, we recommend enabling it in production to ensure data integrity.

client.scanner.log.max-poll-records

Integer

500

The maximum number of records returned by a single poll() call from the LogScanner. Note that this setting does not affect the underlying fetch behavior. The LogScanner caches records from each fetch request and returns them gradually with each poll.

client.scanner.log.fetch.max-bytes

MemorySize

16mb

The maximum data size per fetch request. If the first record batch in the first non-empty bucket exceeds this limit, the client still returns it to ensure progress. This value is therefore a soft limit, not an absolute one.

client.scanner.log.fetch.max-bytes-for-bucket

MemorySize

1mb

The maximum amount of data returned per table bucket in a single fetch request. Records are fetched in batches, and the maximum byte size is configured by this option.

client.scanner.log.fetch.min-bytes

MemorySize

1b

The minimum amount of data in bytes the client expects from the server in response to each log fetch request. If the data size is insufficient, the client waits for a maximum of client.scanner.log.fetch-wait-max-time before returning.

client.scanner.log.fetch.wait-max-time

Duration

500ms

The maximum time the client waits for a response from the server for each log fetch request.

client.scanner.io.tmpdir

String

System.getProperty("java.io.tmpdir") + "/fluss"

The local directory where the client temporarily stores data files, such as KV snapshots and log segment files, for reading.

client.scanner.remote-log.prefetch-num

Integer

4

The number of remote log segments downloaded from remote storage by the LogScanner. These log segments are saved in local temporary files. The default value is 4.

client.remote-file.download-thread-num

Integer

3

The number of threads the client uses to download remote files.

Write parameters

Parameter

Type

Default

Description

sink.ignore-delete

Boolean

false

When true, the sink ignores DELETE and UPDATE_BEFORE changelog events.

sink.bucket-shuffle

Boolean

true

Specifies whether to shuffle data by bucket ID before writing to the sink. Shuffling to group records with the same bucket ID into the same task improves client-side efficiency and reduces resource usage. For log tables, this requires a bucket.key. This option is enabled by default for primary key tables.

client.writer.buffer.memory-size

MemorySize

64mb

The total number of bytes that the writer can use to buffer internal rows.

client.writer.batch-size

MemorySize

2mb

The size of a batch for a single bucket. The writer groups records for the same bucket into batches, which improves both client and server performance.

client.writer.buffer.wait-timeout

Duration

2^(63)-1ns

Defines how long the writer blocks when waiting for a segment to become available.

client.writer.batch-timeout

Duration

100ms

To improve throughput, the writer can add a small artificial delay to wait for more records to arrive before sending a batch. This is similar to Nagle's algorithm in TCP and sets an upper bound on send latency. A batch is sent immediately when it reaches client.writer.batch-size, but if it's smaller, the writer waits up to this timeout for more records.

client.writer.bucket.no-key-assigner

Enum

STICKY

Specifies the bucket assignment strategy for keyless tables. For tables with a bucket key or primary key, the system selects a bucket based on the key's hash value. For keyless tables, you can select the bucket assigner with this parameter. The following strategies are supported:

  • STICKY: Assigns a new bucket ID only when the record batch changes. Otherwise, it uses the same bucket ID as the previous record.

  • ROUND_ROBIN: Assigns bucket IDs to input rows in a round-robin fashion.

client.writer.acks

String

all

Valid values:

  • -1 (or all): The leader waits for all in-sync replicas (ISRs) to acknowledge the record. This guarantees no data loss as long as at least one in-sync replica remains alive.

  • 1: The leader acknowledges immediately after writing the record to the local log, without waiting for replicas. If the leader fails before replication completes, data may be lost.

  • 0: The writer does not wait for any acknowledgment from the server. The writer cannot guarantee the server received the record. Network errors or server failures may cause data loss. This is suitable for high-throughput scenarios where some data loss is acceptable.

Note

In production environments, use all to ensure high availability and prevent data loss.

client.writer.request-max-size

MemorySize

10mb

The maximum size of a request in bytes. This setting limits the number of record batches the writer can send in a single request to avoid sending overly large requests.

client.writer.retries

Integer

Integer.MAX_VALUE

Setting a value greater than zero causes the client to resend any record that fails with a potentially transient error.

client.writer.enable-idempotence

Boolean

true

Specifies whether to enable idempotence, which is enabled by default. When enabled, the writer ensures exactly-once semantics, which prevents duplicates and preserves ordering. If disabled, client retries due to server failures can result in duplicate messages.

client.writer.max-inflight-requests-per-bucket

Integer

5

The maximum number of unacknowledged requests per bucket. This setting applies only when client.writer.enable-idempotence is set to true. If the number of in-flight requests for a bucket reaches this limit, the writer blocks until a pending request is acknowledged before sending another.

Dimension table parameters

Parameter

Type

Default

Description

lookup.async

Boolean

true

Specifies whether to use asynchronous lookup. Asynchronous lookup provides better throughput than synchronous lookup.

lookup.insert-if-not-exists

Boolean

false

When true, automatically inserts a new row if a lookup key is not found. Fluss inserts a new row using the lookup key as the primary key and then returns that row. This feature applies only to primary key lookups and is not supported for prefix lookups.

lookup.cache

Enum

NONE

The caching policy for this lookup table. Supported values are NONE and PARTIAL.

lookup.max-retries

Integer

3

The maximum number of retries allowed when a lookup operation fails.

lookup.partial-cache.expire-after-access

Duration

(None)

The expiration time for a cache entry after it has been accessed.

lookup.partial-cache.expire-after-write

Duration

(None)

The expiration time for a cache entry after it has been written.

lookup.partial-cache.cache-missing-key

Boolean

true

If true, caches the fact that a key is missing (by storing a null value) to avoid repeated lookups for non-existent keys.

lookup.partial-cache.max-rows

Long

(None)

The maximum number of rows to store in the cache.

client.lookup.queue-size

Integer

25600

The maximum number of pending lookup operations.

client.lookup.max-batch-size

Integer

128

The maximum number of lookups to batch into a single request.

client.lookup.max-inflight-requests

Integer

128

The maximum number of unacknowledged lookup requests for a lookup operation.

client.lookup.batch-timeout

Duration

100ms

The maximum time to wait for a lookup batch to fill. If this timeout is reached, the lookup batch is closed and sent.

Other parameters

Parameter

Type

Default

Description

bootstrap.servers

List

(None)

Specify the initial list of nodes used to connect to the Fluss cluster, in the format host1:port1,host2:port2,....

The client uses this list to discover the full cluster topology, so it does not need to include all servers. We recommend configuring multiple nodes for high availability to avoid a single point of failure.

client.id

String

""

An ID string to pass to the server with each request. Using a meaningful ID (e.g., order-service-prod) helps identify the request source in server-side logs.

client.connect-timeout

Duration

120s

The connection timeout for the Netty client.

client.request-timeout

Duration

30s

The timeout for completing a request. When client.writer.acks is set to all (-1), this specifies the maximum time the client waits for a write to be fully acknowledged. Default: 30s.

client.filesystem.security.token.renewal.backoff

Duration

1h

The time interval to wait before retrying to fetch a new security token for the file system after a fetch failure.

client.filesystem.security.token.renewal.time-ratio

Double

0.75

A ratio of the token's lifetime that determines when to renew it. For example, a ratio of 0.75 means renewal is attempted after 75% of the token's validity period has passed.

client.metrics.enabled

Boolean

false

Enables client metrics. When enabled, the client collects and reports metrics through a JMX metrics reporter.