本文将为您介绍表的功能特性,远程存储、列裁剪以及日志压缩。
表特性概览
远程存储
远程存储指成本更低、可靠性更高的外部存储系统,例如 S3、HDFS 或阿里云 OSS。阿里云实时流存储 Fluss 默认使用 OSS 作为远程存储。
Fluss 利用远程存储实现数据分层与容错。
对于日志表,远程存储保存分层的日志数据段。
对于主键表,远程存储保存快照和变更日志的数据段。
远程日志机制
Fluss 是流式存储系统,数据通常通过尾部读取被消费。为保障低延迟,最新数据保留在本地磁盘。较旧数据则异步迁移至远程存储,以降低本地存储成本。
表级配置
日志段复制到远程后,本地副本通常会被删除,以节省空间。但为提升读取性能,可选择在本地保留最近的若干日志段。
配置项 | 数据类型 | 默认值 | 说明 |
table.log.tiered.local-segments | 整数 | 2 | 可指定在本地保留的日志段数量。 |
集群级配置
以下配置项控制日志分层行为:
配置项 | 数据类型 | 默认值 | 说明 |
remote.log.task-interval-duration | 时间间隔 | 1分钟 | 远程日志管理任务的执行周期。包括复制、清理和删除操作。设为 0 秒则禁用远程日志功能。 |
remote.log.index-file-cache-size | 内存大小 | 1GB | 本地缓存中用于存放远程索引文件的总空间。 |
remote.log-manager.thread-pool-size | 整数 | 4 | 调度远程日志任务的线程池大小。 |
remote.log.data-transfer-thread-num | 整数 | 4 | 上传和下载远程日志文件(数据、索引、元数据)所用的线程数量。 |
log.segment.file-size | 内存大小 | 1GB | 用于控制日志文件的大小。日志的保留和清理始终以单个文件为单位进行,因此较大的文件意味着更少的文件数量。 与表级别配置 |
主键表的远程快照
主键表的数据按分桶(bucket)分布。每个分桶在本地仅保留一个副本,无从副本。
为防止本地磁盘故障导致数据丢失,Fluss 定期生成快照并上传至远程存储。
快照包含数据副本和一个日志偏移量(log offset),表示快照生成后下一个待读取的变更位置。
当节点故障时,Fluss 可从远程下载快照,并重放后续变更日志,在其他节点上恢复数据。
快照与日志偏移量保证一致性,使客户端能无缝从全量读取(快照)切换到增量订阅(变更日志),避免数据重复或丢失。
集群级快照配置
配置项 | 数据类型 | 默认值 | 说明 |
kv.snapshot.interval | 时间间隔 | 10分钟 | 主键表快照的生成周期。 |
kv.snapshot.scheduler-thread-num | 整数 | 1 | 负责调度本机所有快照任务的线程数量。 |
kv.snapshot.transfer-thread-num | 整数 | 4 | 上传和下载快照文件所用的线程数量。 |
kv.snapshot.num-retained | 整数 | 1 | 保留已完成快照的最大数量。建议设为较大值,防止客户端读取过程中快照被删除。 |
列裁剪
列裁剪是一种优化技术,查询时只读取所需列,跳过无关列,减少数据读取量。Fluss 支持流式读取中的列裁剪 ,是业内少数能在实时流场景下实现该能力的系统。
Fluss 在日志表和主键表的变更日志中均支持列裁剪。它降低存储 I/O、减少网络传输、减轻客户端解析负担,显著提升查询性能。
无论批处理还是流处理,系统始终只处理必要字段,最大化资源利用率。
计算引擎(如 Flink)在执行查询时会分析所需列,并通知 Fluss 仅返回这些列。
SELECT id, name FROM log_table WHERE timestamp > '2023-01-01'
该查询仅读取id
、name
和 timestamp
列。表中的其他列(如 address、status)不会从存储加载,彻底避免无效读取。
日志压缩
Fluss 使用 Arrow 原生压缩机制,压缩后数据仍符合 Arrow 格式标准。压缩按列独立执行,保留列裁剪能力,不影响查询性能。
压缩流程
日志表 :数据在客户端由写入器压缩,以压缩格式写入存储;读取时由日志扫描器解压。
主键表的变更日志 :日志在服务端生成,压缩也在服务端完成。
压缩效果
压缩显著降低网络传输与存储成本。基准测试显示,ZSTD 压缩级别 3 可实现约 5:1 的压缩比 (例如,5GB 数据压缩至 1GB)。网络开销减少,读写吞吐量随之提升。
配置方式
参数 | 默认配置 | 说明 |
table.log.arrow.compression.type | ZSTD | 压缩算法:
|
table.log.arrow.compression.zstd.level | 3 | 压缩级别,取值为: 1 ~ 22。 |
配置示例
-- 设置压缩算法为 ZSTD,压缩级别为 2
CREATE TABLE log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'table.log.arrow.compression.type' = 'ZSTD',
'table.log.arrow.compression.zstd.level' = '2'
);
压缩配置仅适用于
table.log.format='arrow'
的表。若设置
table.log.format='indexed'
,压缩配置将被忽略。