Connector 参数

本文将为您介绍Fluss Connector的相关参数。

如何配置参数

创建表时配置参数

一旦表被创建后,这部分定义的参数会被作业表的元数据持久化存储在 Fluss 中。其中存储参数在读写数据的时候会生效。

# 设置table.log.ttl为7天并且在读取时禁止CRC校验
CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) WITH (
  'table.log.ttl' = '7d',
  'client.scanner.log.check-crc' = 'false'
);

SQL Hints 动态配置参数

Fluss 也支持通过 SQL Hints 的方式动态配置参数。这部分动态的配置参数不会作为元数据的一部分持久化存储在 Fluss 中。这部分配置只会在当前 query 中生效而不会影响其他的 query。

重要
  • 动态配置的参数的优先级要高于表元数据中持久化的参数。

  • 存储参数不支持通过 SQL Hints 动态配置。

-- 读取时设置 SQL hints
SELECT * FROM `my-catalog`.`my_db`.`my_log_table` /*+ OPTIONS('client.scanner.log.check-crc' = 'false') */;

-- 写入时设置 SQL hints
INSERT INTO `my-catalog`.`my_db`.`my_pk_table_2` /*+ OPTIONS('sink.ignore-delete'='true') */ select * from `my-catalog`.`my_db`.`my_pk_table_1`;

存储参数

参数

类型

默认值

说明

bucket.num

int

1

Fluss集群的桶数量。

bucket.key

String

数据将根据bucket.key的哈希值分布到各个桶中。bucket.key必须是主键的一个子集(不包含分区键)。若指定多个字段作为 bucket.key,字段之间使用逗号 , 分隔。

  • 若表定义了主键且未显式指定bucket.key,则默认使用主键(排除分区键)作为bucket.key

  • 若表未定义主键且也未指定bucket.key,则数据将在各桶间随机分布。

table.log.ttl

Duration

7d

日志数据的生存时间。该配置控制日志保留的最大时间,之后将删除旧的日志数据以释放空间。如果设置为 -1,则日志不会被删除。

table.auto-partition.enabled

Boolean

false

启用自动分区后,表的分区将自动创建。

table.auto-partition.time-unit

ENUM

DAY

自动创建分区的时间粒度。有效值为YEARQUARTERMONTHDAYHOUR

例如:值为HOUR,自动创建分区的格式为yyyyMMddHH。值为DAY,自动创建分区的格式为yyyyMMdd

table.auto-partition.num-precreate

Integer

2

每次检查自动分区时预创建的分区数量。例如,如果当前检查时间为 2024-11-11,且值配置为 3,则将预创建分区 20241111、20241112 和 20241113。如果任何一个分区已存在,则跳过创建该分区。默认值为 2,表示将预创建 2 个分区。如果 table.auto-partition.time-unit 为 DAY(默认值),一个预创建的分区为当天,另一个为明天。

table.auto-partition.num-retention

Integer

7

每次检查自动分区时保留的历史分区数量。例如,如果当前检查时间为 2024-11-11,时间单位为 DAY,且值配置为 3,则将保留历史分区 20241108、20241109 和 20241110。早于 20241108 的分区将被删除。默认值为 7,表示将保留 7 个分区。

table.auto-partition.time-zone

String

系统时区

自动分区的时区,默认与系统时区相同。

table.replication.factor

Integer

(无)

日志表的副本数量。如果不设置,Fluss 将使用集群默认的副本数量 default.replication.factor。该值必须是正数,且不大于 Fluss 集群中 tabletServer 的数量。如果设置的值大于 Fluss 集群中的 tabletServer 数量,则在创建新表时会报错。

table.log.format

Enum

ARROW

日志存储中日志存储的格式。默认值为 ARROW。支持的格式为 ARROW 和 INDEXED。

table.log.arrow.compression.type

Enum

ZSTD

如果日志格式设置为 ARROW,则该值时配置日志数据的压缩类型。候选的压缩类型为 NONE、LZ4_FRAME 和 ZSTD。默认值为 ZSTD。

table.log.arrow.compression.zstd.level

Integer

3

如果日志格式设置为 ARROW 且压缩类型设置为 ZSTD,则该值配置压缩的级别。有效范围为 1 到 22。默认值为 3。

table.kv.format

Enum

COMPACTED

kv 存储中 kv 数据的存储格式。默认值为 COMPACTED。支持的格式为 COMPACTED 和 INDEXED。

table.log.tiered.local-segments

Integer

2

启用日志分层存储时,每个表在本地保留的日志段(Log Segment)数量。必须大于 0,默认值为 2。

table.datalake.enabled

Boolean

false

是否启用表的湖存储,默认禁用。启用后,若湖仓分层服务已启动,则表数据将自动分层并压缩至湖存储中。

table.datalake.format

Enum

(无)

指定表所使用的数据湖格式,用于分层湖仓存储。当前仅支持paimon

该参数的核心作用是:

  • 确保 Fluss 与外部湖仓系统在键编码(key encoding) 和分桶策略(bucketing) 上保持一致;

  • 支持 Union Read 功能,实现 Fluss 表与数据湖表的统一读取。

配置行为说明

  • 优先级最高:若在建表时显式设置 table.datalake.format,则以该值为准。

  • 默认值:若未设置,则使用 Fluss 集群级配置的 datalake.format 值。

  • 动态启用支持:可在 table.datalake.enabled = false 时预先配置 table.datalake.format,后续通过启用 table.datalake.enabled 来激活数据湖功能,无需重建表。

table.merge-engine

Enum

(无)

定义主键表的数据更新策略。默认策略为保留最新数据(last_row)。此外,支持以下两种策略: 

  • first_row:保留相同主键的第一条数据。

  • versioned:保留相同主键中版本号最大的数据。

table.merge-engine.versioned.ver-column

String

(无)

如果合并策略设置为 versioned,则必须设置版本列的列名。

读取参数

参数

类型

默认值

说明

scan.startup.mode

Enum

full

读取模式允许您指定数据消费的起始点。Fluss 目前支持以下 scan.startup.mode 选项:full(默认值)、earliest、latest、timestamp。更多详情请参见读取数据

scan.startup.timestamp

Long

(无)

开始读取数据的时间戳。仅当 scan.startup.mode 设置为 timestamp 时,此选项才有效。格式为 'milli-second-since-epoch' 或 'yyyy-MM-dd HH:mm:ss',例如 '1678883047356' 或 '2023-12-09 23:09:12'。

scan.partition.discovery.interval

Duration

10s

在读取分区表时,Fluss Source 可按照设定的时间间隔自动探测新分区。当检测到新分区时,会动态将其加入读取计划;若分区被删除,则自动取消订阅。若将该间隔设为负值,则表示禁用自动分区发现功能。

client.scanner.log.check-crc

Boolean

true

是否对读取到的消息执行 CRC32 校验。该功能用于确保消息在传输或磁盘存储过程中未发生损坏,提升数据可靠性。虽然会引入极小的计算开销,但鉴于其对数据完整性的关键作用,建议在生产环境中保持启用。

client.scanner.log.max-poll-records

Integer

500

LogScanner 的单次 poll() 调用返回的最大记录数。请注意,此配置不会影响底层的获取行为。LogScanner 将缓存每次获取请求的记录,并从每次 poll 中逐步返回它们。

client.scanner.log.fetch.max-bytes

MemorySize

16mb

客户端从服务器获取请求的最大数据量。记录是分批获取的,如果第一个非空桶的第一个记录批次大于此值,该记录批次仍将被返回,以确保获取可以取得进展。因此,这不是一个绝对的最大值。

client.scanner.log.fetch.max-bytes-for-bucket

MemorySize

1mb

客户端从服务器获取请求时,每个表桶返回的最大数据量。记录是分批获取的,最大字节大小由此选项配置。

client.scanner.log.fetch.min-bytes

MemorySize

1b

客户端每次获取日志请求从服务器响应的期望最小字节数。如果字节数不足,将等待最多 client.scanner.log.fetch-wait-max-time 时间再返回。

client.scanner.log.fetch.wait-max-time

Duration

500ms

客户端每次获取日志请求从服务器响应的最大等待时间。

client.scanner.io.tmpdir

String

System.getProperty("java.io.tmpdir") + "/fluss"

客户端用于临时存储数据文件(如 kv 快照、日志段文件)的本地目录,以便读取。

client.scanner.remote-log.prefetch-num

Integer

4

LogScanner 从远程存储下载的远程日志段数量,这些日志段将被保存在本地临时文件中。默认设置为 4。

client.remote-file.download-thread-num

Integer

3

客户端用于下载远程文件的线程数。

写入参数

参数

类型

默认值

说明

sink.ignore-delete

Boolean

false

如果设置为 true,则目标端将忽略 DELETE 和 UPDATE_BEFORE 的变更日志事件。

sink.bucket-shuffle

Boolean

true

是否在写入目标端之前按桶 ID 进行洗牌。将具有相同桶 ID 的数据洗牌到由同一任务处理可以提高客户端处理效率并减少资源消耗。对于日志表,只有在定义了 bucket.key 时,bucket shuffle才会生效。对于主键表,默认启用。

client.writer.buffer.memory-size

MemorySize

64mb

写入器可以用来缓冲内部行的总字节数。

client.writer.batch-size

MemorySize

2mb

写入器将同一桶的记录批量组合在一起。这有助于提高客户端的性能。将尝试将同一桶的记录批量组合在一起。这有助于提高客户端和服务器的性能。

client.writer.buffer.wait-timeout

Duration

2^(63)-1ns

定义写入器在等待段可用时阻塞的时间。

client.writer.batch-timeout

Duration

100ms

写入器将尝试将到达的行批量组合在一起,以便在请求发送之间发送。通常,这仅在负载较高且行到达速度比发送速度快时才会发生。然而,在某些情况下,写入器可能希望即使在中等负载下也减少请求数量。此设置通过添加少量人工延迟来实现这一点,即,而不是立即发送一行,写入器将等待最多指定的延迟时间,以便其他记录到达,以便可以将它们批量发送。这可以类比为 TCP 中的 Nagle 算法。此设置为批量的延迟上限:一旦我们为一个桶积累了 client.writer.batch-size 的行,它将立即发送,无论此设置如何,但如果为该桶积累的字节数少于此值,我们将等待指定的时间,等待更多记录到达。

client.writer.bucket.no-key-assigner

Enum

STICKY

指定无键表的桶分配策略。对于有桶键或主键的表,系统会基于键的哈希值选择桶。对于无键表,可通过此参数选择桶分配器。支持以下两种策略:

  • STICKY:仅当记录批次发生变化时分配新桶 ID,否则保持与前一条记录相同的桶 ID。

  • ROUND_ROBIN:按轮询方式为输入行分配桶 ID。

client.writer.acks

String

all

取值如下:

  • -1(all):Leader必须等待所有同步副本(ISR)确认后才返回成功。只要至少一个副本存活,数据就不会丢失。

  • 1:Leader将记录写入本地日志后立即确认,无需等待副本同步,若Leader在同步前故障,数据可能丢失。

  • 0:写入器不等待服务器的任何确认。无法保证记录已被接收,网络错误或服务器故障可能导致数据丢失。适合高吞吐、可容忍丢数据的场景。

说明

生产环境建议使用all,以确保数据高可用和不丢失。

client.writer.request-max-size

MemorySize

10mb

请求的最大大小(以字节为单位)。此设置将限制写入器在单个请求中发送的记录批次数量,以避免发送过大的请求。请注意,此重试与写入器在收到错误后重新发送行没有区别。

client.writer.retries

Integer

Integer.MAX_VALUE

设置大于零的值,客户端会重发任何因可能的可重试的错误而发送失败的 record。

client.writer.enable-idempotence

Boolean

true

是否启用幂等性保障,默认为开启。启用后,可确保客户端写入的数据具有顺序性和 exactly once 保证。若关闭此功能,当因 Fluss 服务端故障等原因触发客户端重试时,可能导致相同消息被重复写入流中。

client.writer.max-inflight-requests-per-bucket

Integer

5

每个桶的未确认请求的最大数量。此配置只有在 client.writer.enable-idempotence 设置为 true 时才有效。当每个桶的未确认请求数量超过此设置时,写入器将等待未确认请求完成,然后发送新请求。

维表参数

参数

类型

默认值

说明

lookup.async

Boolean

true

是否使用异步Lookup。异步Lookup比同步Lookup具有更好的吞吐量性能。

lookup.cache

Enum

NONE

lookup表的缓存策略,包括NONE、PARTIAL。

lookup.max-retries

Integer

3

Lookup操作失败时允许的最大重试次数。

lookup.partial-cache.expire-after-access

Duration

(None)

访问缓存中的条目后,该条目的过期时间。

lookup.partial-cache.expire-after-write

Duration

(None)

写入缓存中的条目后,该条目的过期时间。

lookup.partial-cache.cache-missing-key

Boolean

true

如果lookup key在表中没有匹配到任何行,是否将空值存储到缓存中。

lookup.partial-cache.max-rows

Long

(None)

缓存中存储的最大行数。

client.lookup.queue-size

Integer

25600

待处理lookup操作的最大数量。

client.lookup.max-batch-size

Integer

128

将多个lookup操作合并为一个lookup请求的最大批次大小。

client.lookup.max-inflight-requests

Integer

128

lookup操作中未确认的lookup请求的最大数量。

client.lookup.batch-timeout

Duration

100ms

等待lookup批次装满的最大时间,如果达到此超时时间,lookup批次将被关闭并发送出去。

其他参数

参数

类型

默认值

说明

bootstrap.servers

List

(无)

指定用于连接 Fluss 集群的初始节点列表,格式为 host1:port1,host2:port2,...

客户端将通过这些节点获取完整的集群拓扑信息,因此列表无需包含所有服务器。建议配置多个节点,以提高连接的高可用性,防止单点故障导致连接失败。

client.id

String

""

设置一个具有业务意义的客户端 ID(如 order-service-prod),以便在服务器日志中清晰识别请求来源。

client.connect-timeout

Duration

120s

Netty 客户端连接超时时间。

client.request-timeout

Duration

30s

请求完成的超时时间。如果用户将写入确认设置为 -1,则此超时时间是延迟写入尝试完成的最大时间。默认设置为 30 秒。

client.filesystem.security.token.renewal.backoff

Duration

1h

在获取文件系统安全令牌失败后,重试获取新安全令牌的时间间隔。

client.filesystem.security.token.renewal.time-ratio

Double

0.75

在文件系统安全令牌到期时间的比率,用于确定何时重新获取访问文件系统的新凭据。

client.metrics.enabled

Boolean

false

启用客户端指标。启用指标后,客户端将收集指标并通过 JMX 指标报告器进行报告。