本文将为您介绍Fluss Connector的相关参数。
如何配置参数
创建表时配置参数
一旦表被创建后,这部分定义的参数会被作业表的元数据持久化存储在 Fluss 中。其中存储参数在读写数据的时候会生效。
# 设置table.log.ttl为7天并且在读取时禁止CRC校验
CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'table.log.ttl' = '7d',
'client.scanner.log.check-crc' = 'false'
);
SQL Hints 动态配置参数
Fluss 也支持通过 SQL Hints 的方式动态配置参数。这部分动态的配置参数不会作为元数据的一部分持久化存储在 Fluss 中。这部分配置只会在当前 query 中生效而不会影响其他的 query。
动态配置的参数的优先级要高于表元数据中持久化的参数。
存储参数不支持通过 SQL Hints 动态配置。
-- 读取时设置 SQL hints
SELECT * FROM `my-catalog`.`my_db`.`my_log_table` /*+ OPTIONS('client.scanner.log.check-crc' = 'false') */;
-- 写入时设置 SQL hints
INSERT INTO `my-catalog`.`my_db`.`my_pk_table_2` /*+ OPTIONS('sink.ignore-delete'='true') */ select * from `my-catalog`.`my_db`.`my_pk_table_1`;
存储参数
参数 | 类型 | 默认值 | 说明 |
bucket.num | int | 1 | Fluss集群的桶数量。 |
bucket.key | String | 无 | 数据将根据
|
table.log.ttl | Duration | 7d | 日志数据的生存时间。该配置控制日志保留的最大时间,之后将删除旧的日志数据以释放空间。如果设置为 -1,则日志不会被删除。 |
table.auto-partition.enabled | Boolean | false | 启用自动分区后,表的分区将自动创建。 |
table.auto-partition.time-unit | ENUM | DAY | 自动创建分区的时间粒度。有效值为 例如:值为 |
table.auto-partition.num-precreate | Integer | 2 | 每次检查自动分区时预创建的分区数量。例如,如果当前检查时间为 2024-11-11,且值配置为 3,则将预创建分区 20241111、20241112 和 20241113。如果任何一个分区已存在,则跳过创建该分区。默认值为 2,表示将预创建 2 个分区。如果 table.auto-partition.time-unit 为 DAY(默认值),一个预创建的分区为当天,另一个为明天。 |
table.auto-partition.num-retention | Integer | 7 | 每次检查自动分区时保留的历史分区数量。例如,如果当前检查时间为 2024-11-11,时间单位为 DAY,且值配置为 3,则将保留历史分区 20241108、20241109 和 20241110。早于 20241108 的分区将被删除。默认值为 7,表示将保留 7 个分区。 |
table.auto-partition.time-zone | String | 系统时区 | 自动分区的时区,默认与系统时区相同。 |
table.replication.factor | Integer | (无) | 日志表的副本数量。如果不设置,Fluss 将使用集群默认的副本数量 default.replication.factor。该值必须是正数,且不大于 Fluss 集群中 tabletServer 的数量。如果设置的值大于 Fluss 集群中的 tabletServer 数量,则在创建新表时会报错。 |
table.log.format | Enum | ARROW | 日志存储中日志存储的格式。默认值为 ARROW。支持的格式为 ARROW 和 INDEXED。 |
table.log.arrow.compression.type | Enum | ZSTD | 如果日志格式设置为 ARROW,则该值时配置日志数据的压缩类型。候选的压缩类型为 NONE、LZ4_FRAME 和 ZSTD。默认值为 ZSTD。 |
table.log.arrow.compression.zstd.level | Integer | 3 | 如果日志格式设置为 ARROW 且压缩类型设置为 ZSTD,则该值配置压缩的级别。有效范围为 1 到 22。默认值为 3。 |
table.kv.format | Enum | COMPACTED | kv 存储中 kv 数据的存储格式。默认值为 COMPACTED。支持的格式为 COMPACTED 和 INDEXED。 |
table.log.tiered.local-segments | Integer | 2 | 启用日志分层存储时,每个表在本地保留的日志段(Log Segment)数量。必须大于 0,默认值为 2。 |
table.datalake.enabled | Boolean | false | 是否启用表的湖存储,默认禁用。启用后,若湖仓分层服务已启动,则表数据将自动分层并压缩至湖存储中。 |
table.datalake.format | Enum | (无) | 指定表所使用的数据湖格式,用于分层湖仓存储。当前仅支持 该参数的核心作用是:
配置行为说明
|
table.merge-engine | Enum | (无) | 定义主键表的数据更新策略。默认策略为保留最新数据(last_row)。此外,支持以下两种策略:
|
table.merge-engine.versioned.ver-column | String | (无) | 如果合并策略设置为 versioned,则必须设置版本列的列名。 |
读取参数
参数 | 类型 | 默认值 | 说明 |
scan.startup.mode | Enum | full | 读取模式允许您指定数据消费的起始点。Fluss 目前支持以下 scan.startup.mode 选项:full(默认值)、earliest、latest、timestamp。更多详情请参见读取数据。 |
scan.startup.timestamp | Long | (无) | 开始读取数据的时间戳。仅当 scan.startup.mode 设置为 timestamp 时,此选项才有效。格式为 'milli-second-since-epoch' 或 'yyyy-MM-dd HH:mm:ss',例如 '1678883047356' 或 '2023-12-09 23:09:12'。 |
scan.partition.discovery.interval | Duration | 10s | 在读取分区表时,Fluss Source 可按照设定的时间间隔自动探测新分区。当检测到新分区时,会动态将其加入读取计划;若分区被删除,则自动取消订阅。若将该间隔设为负值,则表示禁用自动分区发现功能。 |
client.scanner.log.check-crc | Boolean | true | 是否对读取到的消息执行 CRC32 校验。该功能用于确保消息在传输或磁盘存储过程中未发生损坏,提升数据可靠性。虽然会引入极小的计算开销,但鉴于其对数据完整性的关键作用,建议在生产环境中保持启用。 |
client.scanner.log.max-poll-records | Integer | 500 | LogScanner 的单次 poll() 调用返回的最大记录数。请注意,此配置不会影响底层的获取行为。LogScanner 将缓存每次获取请求的记录,并从每次 poll 中逐步返回它们。 |
client.scanner.log.fetch.max-bytes | MemorySize | 16mb | 客户端从服务器获取请求的最大数据量。记录是分批获取的,如果第一个非空桶的第一个记录批次大于此值,该记录批次仍将被返回,以确保获取可以取得进展。因此,这不是一个绝对的最大值。 |
client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb | 客户端从服务器获取请求时,每个表桶返回的最大数据量。记录是分批获取的,最大字节大小由此选项配置。 |
client.scanner.log.fetch.min-bytes | MemorySize | 1b | 客户端每次获取日志请求从服务器响应的期望最小字节数。如果字节数不足,将等待最多 client.scanner.log.fetch-wait-max-time 时间再返回。 |
client.scanner.log.fetch.wait-max-time | Duration | 500ms | 客户端每次获取日志请求从服务器响应的最大等待时间。 |
client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | 客户端用于临时存储数据文件(如 kv 快照、日志段文件)的本地目录,以便读取。 |
client.scanner.remote-log.prefetch-num | Integer | 4 | LogScanner 从远程存储下载的远程日志段数量,这些日志段将被保存在本地临时文件中。默认设置为 4。 |
client.remote-file.download-thread-num | Integer | 3 | 客户端用于下载远程文件的线程数。 |
写入参数
参数 | 类型 | 默认值 | 说明 |
sink.ignore-delete | Boolean | false | 如果设置为 true,则目标端将忽略 DELETE 和 UPDATE_BEFORE 的变更日志事件。 |
sink.bucket-shuffle | Boolean | true | 是否在写入目标端之前按桶 ID 进行洗牌。将具有相同桶 ID 的数据洗牌到由同一任务处理可以提高客户端处理效率并减少资源消耗。对于日志表,只有在定义了 bucket.key 时,bucket shuffle才会生效。对于主键表,默认启用。 |
client.writer.buffer.memory-size | MemorySize | 64mb | 写入器可以用来缓冲内部行的总字节数。 |
client.writer.batch-size | MemorySize | 2mb | 写入器将同一桶的记录批量组合在一起。这有助于提高客户端的性能。将尝试将同一桶的记录批量组合在一起。这有助于提高客户端和服务器的性能。 |
client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | 定义写入器在等待段可用时阻塞的时间。 |
client.writer.batch-timeout | Duration | 100ms | 写入器将尝试将到达的行批量组合在一起,以便在请求发送之间发送。通常,这仅在负载较高且行到达速度比发送速度快时才会发生。然而,在某些情况下,写入器可能希望即使在中等负载下也减少请求数量。此设置通过添加少量人工延迟来实现这一点,即,而不是立即发送一行,写入器将等待最多指定的延迟时间,以便其他记录到达,以便可以将它们批量发送。这可以类比为 TCP 中的 Nagle 算法。此设置为批量的延迟上限:一旦我们为一个桶积累了 client.writer.batch-size 的行,它将立即发送,无论此设置如何,但如果为该桶积累的字节数少于此值,我们将等待指定的时间,等待更多记录到达。 |
client.writer.bucket.no-key-assigner | Enum | STICKY | 指定无键表的桶分配策略。对于有桶键或主键的表,系统会基于键的哈希值选择桶。对于无键表,可通过此参数选择桶分配器。支持以下两种策略:
|
client.writer.acks | String | all | 取值如下:
说明 生产环境建议使用all,以确保数据高可用和不丢失。 |
client.writer.request-max-size | MemorySize | 10mb | 请求的最大大小(以字节为单位)。此设置将限制写入器在单个请求中发送的记录批次数量,以避免发送过大的请求。请注意,此重试与写入器在收到错误后重新发送行没有区别。 |
client.writer.retries | Integer | Integer.MAX_VALUE | 设置大于零的值,客户端会重发任何因可能的可重试的错误而发送失败的 record。 |
client.writer.enable-idempotence | Boolean | true | 是否启用幂等性保障,默认为开启。启用后,可确保客户端写入的数据具有顺序性和 exactly once 保证。若关闭此功能,当因 Fluss 服务端故障等原因触发客户端重试时,可能导致相同消息被重复写入流中。 |
client.writer.max-inflight-requests-per-bucket | Integer | 5 | 每个桶的未确认请求的最大数量。此配置只有在 client.writer.enable-idempotence 设置为 true 时才有效。当每个桶的未确认请求数量超过此设置时,写入器将等待未确认请求完成,然后发送新请求。 |
维表参数
参数 | 类型 | 默认值 | 说明 |
lookup.async | Boolean | true | 是否使用异步Lookup。异步Lookup比同步Lookup具有更好的吞吐量性能。 |
lookup.cache | Enum | NONE | 此lookup表的缓存策略,包括NONE、PARTIAL。 |
lookup.max-retries | Integer | 3 | Lookup操作失败时允许的最大重试次数。 |
lookup.partial-cache.expire-after-access | Duration | (None) | 访问缓存中的条目后,该条目的过期时间。 |
lookup.partial-cache.expire-after-write | Duration | (None) | 写入缓存中的条目后,该条目的过期时间。 |
lookup.partial-cache.cache-missing-key | Boolean | true | 如果lookup key在表中没有匹配到任何行,是否将空值存储到缓存中。 |
lookup.partial-cache.max-rows | Long | (None) | 缓存中存储的最大行数。 |
client.lookup.queue-size | Integer | 25600 | 待处理lookup操作的最大数量。 |
client.lookup.max-batch-size | Integer | 128 | 将多个lookup操作合并为一个lookup请求的最大批次大小。 |
client.lookup.max-inflight-requests | Integer | 128 | lookup操作中未确认的lookup请求的最大数量。 |
client.lookup.batch-timeout | Duration | 100ms | 等待lookup批次装满的最大时间,如果达到此超时时间,lookup批次将被关闭并发送出去。 |
其他参数
参数 | 类型 | 默认值 | 说明 |
bootstrap.servers | List | (无) | 指定用于连接 Fluss 集群的初始节点列表,格式为 客户端将通过这些节点获取完整的集群拓扑信息,因此列表无需包含所有服务器。建议配置多个节点,以提高连接的高可用性,防止单点故障导致连接失败。 |
client.id | String | "" | 设置一个具有业务意义的客户端 ID(如 order-service-prod),以便在服务器日志中清晰识别请求来源。 |
client.connect-timeout | Duration | 120s | Netty 客户端连接超时时间。 |
client.request-timeout | Duration | 30s | 请求完成的超时时间。如果用户将写入确认设置为 -1,则此超时时间是延迟写入尝试完成的最大时间。默认设置为 30 秒。 |
client.filesystem.security.token.renewal.backoff | Duration | 1h | 在获取文件系统安全令牌失败后,重试获取新安全令牌的时间间隔。 |
client.filesystem.security.token.renewal.time-ratio | Double | 0.75 | 在文件系统安全令牌到期时间的比率,用于确定何时重新获取访问文件系统的新凭据。 |
client.metrics.enabled | Boolean | false | 启用客户端指标。启用指标后,客户端将收集指标并通过 JMX 指标报告器进行报告。 |