本文为您介绍如何使用流存储Fluss连接器。
背景信息
阿里云流存储Fluss版采用“湖流一体”架构,实现与Apache Paimon等数据湖格式的无缝对接,在确保数据高时效性的基础上,显著降低实时数仓的构建与运维成本。通过统一的数据流与湖仓协同机制,Fluss助力企业打破数据孤岛,加速湖上数据价值的深度挖掘与高效共享。
类别 | 详情 |
支持类型 | 源表、维表、结果表,数据摄入目标端 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
API种类 | SQL和数据摄入YAML |
是否支持更新或删除结果表数据 | 是 |
SQL
Fluss连接器可以在SQL作业中使用,作为源表,维表或者结果表。详情请参见:引擎对接 - Flink
前提条件
当前用户对Fluss表对目标表有读写权限,参考授权访问Fluss集群。
连接阿里云流存储Fluss集群时,Flink工作空间和Fluss集群需要在同一VPC内。
已在 Flink 控制台创建Fluss Catalog。
特色功能
Fluss Connector 提供以下特色功能:
功能 | 介绍 |
支持全量、增量及全增量一体化消费。 | |
Fluss 支持在 Flink SQL 的批模式下对表数据进行快速探查,这对于数据开发调试、数据验证和问题排查非常实用。 | |
Fluss 主键表天然支持 CDC(Change Data Capture)日志订阅,下游 Flink 作业可以实时消费到完整的变更事件流(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)。 | |
Fluss 支持高效的 KV 点查和二级索引查找,适合低延迟的维表关联场景。 | |
只更新修改部分列的数据,而非整行更新。 | |
新型的 Join 方案,能够在保持双流 Join 语义的同时,将数据更新行为下沉到Fluss表中完成,从而显著降低Flink的资源消耗,提升作业的稳定性和执行效率。 | |
Fluss 主键表支持配置不同的 Merge Engine,用于控制相同主键的多条记录如何合并。 | |
将实时流式存储(Fluss)与基于 Paimon 的数据湖(由 DLF 统一管理元数据)进行深度集成,实现“一份数据,两种视图”,即在同一个逻辑表中,同时提供低延迟的实时数据访问和高吞吐的历史数据分析能力。 | |
支持分区表创建、读写数据,并可自动创建和清理分区。 | |
支持动态新增列,同时不影响运行中的读写作业。 |
使用限制
日志表(Log Table)只能接收 INSERT 消息,无法处理 UPDATE 和 DELETE 操作。
主键表(PrimaryKey Table)可以处理所有类型的消息(INSERT、UPDATE、DELETE)。
维表 Lookup Join 仅支持主键表,Join 条件需包含全部主键字段。
Delta Join 要求左右表均为 Fluss 主键表,且 Join Key 必须包含分桶键(Bucket Key)。
注意事项
Fluss 表有两种类型:日志表(无主键,类似 Kafka Topic)和主键表(有主键,支持 Upsert 语义)。请根据业务场景选择合适的表类型。
生产环境应合理配置
bucket.num,以保证数据的并行读写性能和数据分布均衡。对于分区表,建议合理规划分区策略:根据数据量和查询模式选择合适的分区时间粒度(如按天、按月),并通过
table.auto-partition.num-retention控制历史分区保留数量,避免分区数过多导致元数据膨胀。同时,对于主键表,如果数据本身具有时间或业务维度的天然分区特征,建议优先使用分区表而非将所有数据堆积在单个非分区表中。分区表不仅便于数据生命周期管理,还能在流式和批式查询中利用分区下推(分区裁剪)大幅减少不必要的数据扫描,显著提升查询性能。使用 Delta Join 前,需在 Flink 作业的运行参数中配置相应参数以启用优化。
特色功能详解
全增量一体化消费
功能说明
Fluss 主键表支持全增量一体化消费:先读取表中的历史全量快照数据,全量数据读取完毕后自动无缝切换到增量流式消费,保证数据的完整性和一致性。
消费模式
通过 scan.startup.mode 参数控制消费起始位点:
模式 | 说明 |
| 默认值。先读取全量快照,再增量消费。适合需要完整历史数据的场景。 |
| 从最早的日志位点开始消费,不读取全量快照。 |
| 从最新位点开始消费,仅消费启动后产生的新数据。 |
| 从指定时间戳位点开始消费,需配合 |
SQL 示例
-- 全增量一体化消费(默认模式)
SELECT * FROM `fluss_catalog`.`fluss_db`.orders;
-- 从指定时间戳开始消费,通过 Hint 指定参数
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
/*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '2025-01-01 00:00:00') */;
-- 从最新位点开始消费
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
/*+ OPTIONS('scan.startup.mode' = 'latest') */;CDC 日志订阅与虚拟表
日志订阅
Fluss 主键表天然支持 CDC(Change Data Capture)日志订阅,下游 Flink 作业可以实时消费到完整的变更事件流(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE),无需额外开启 Binlog 配置。
这意味着:
主键表的所有变更都会自动产生 CDC 日志。
下游可直接订阅 Fluss 主键表获取完整的变更流,用于实时数据同步或流式 ETL。
SQL 示例
-- 直接读取主键表即可获取 CDC 变更流
SELECT * FROM `fluss_catalog`.`fluss_db`.orders;
-- 输出包含完整的 INSERT / UPDATE / DELETE 事件虚拟表(Virtual Table)
除了直接订阅 CDC 流之外,Fluss 还提供了虚拟表机制,通过在基础表名后追加特定后缀即可访问带有元数据的变更日志,无需额外存储数据。虚拟表是系统自动生成的,不需要用户手动创建。
Changelog 表($changelog)
适用于主键表和日志表,提供带有元数据的原始变更日志流。
表结构(在原始表字段之外,自动附加以下元数据列):
元数据列 | 类型 | 说明 |
| STRING | 变更类型。主键表可能的值: |
| BIGINT | 该条变更在日志中的偏移量。 |
| TIMESTAMP_LTZ | 变更的提交时间戳。 |
SQL 示例:
-- 读取 orders 表的 changelog 虚拟表
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog;
-- 通过 Hint 指定从最新位点开始消费
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog
/*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- 过滤出删除事件
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog
WHERE _change_type = 'delete';Binlog 表($binlog)
仅适用于主键表,提供包含变更前后镜像(before/after image)的 Binlog 格式数据,便于精确获取每次变更的前后状态。
表结构(在元数据列之外,以嵌套 ROW 类型呈现变更前后的完整行数据):
列 | 类型 | 说明 |
| STRING | 变更类型: |
| BIGINT | 日志偏移量。 |
| TIMESTAMP_LTZ | 提交时间戳。 |
| ROW | 变更前的行数据。 |
| ROW | 变更后的行数据。 |
变更事件对照:
变更类型 |
|
|
| NULL | 新值 |
| 旧值 | 新值 |
| 旧值 | NULL |
SQL 示例:
-- 读取 users 表的 binlog 虚拟表
SELECT * FROM `fluss_catalog`.`fluss_db`.users$binlog;
-- 获取更新事件中字段变更前后的值
SELECT
`before`.user_name AS old_name,
`after`.user_name AS new_name,
_commit_timestamp AS change_time
FROM `fluss_catalog`.`fluss_db`.users$binlog
WHERE _change_type = 'update';使用限制
Changelog 表和 Binlog 表目前不支持投影下推、分区下推和谓词下推。
Binlog 表仅适用于主键表,日志表不支持。
虚拟表的启动模式支持
earliest(默认)、latest和timestamp,不支持full。
部分列更新(Partial Update)
功能说明
Fluss 支持部分列更新(Partial Update),允许多个数据流独立地向同一张主键宽表写入各自关心的字段,Fluss 引擎会自动按主键进行字段级别的合并。这种方式无需在 Flink 中维护复杂的多流 Join 状态,避免了大状态问题。
使用场景
典型场景为实时用户画像宽表构建:不同的数据源(如推荐系统、曝光日志、点击日志、购物车数据)分别更新用户画像表中各自对应的字段,最终形成一张完整的用户 360 度视图。
使用限制
目标表必须为主键表。
每个数据流的写入字段必须包含完整的主键字段。
计算拓扑中不包含多流 Join 节点,直接由 Fluss 在存储层完成 Merge。
SQL 示例
-- 1. 创建宽表
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.user_rec_wide (
user_id STRING,
item_id STRING,
rec_score DOUBLE,
imp_cnt INT,
click_cnt INT,
PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');
-- 2. 多流分别写入各自字段
-- 数据流 1:写入推荐分数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, rec_score)
SELECT user_id, item_id, rec_score
FROM `fluss_catalog`.fluss_db.recommendations;
-- 数据流 2:写入曝光次数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, imp_cnt)
SELECT user_id, item_id, imp_cnt
FROM `fluss_catalog`.fluss_db.impressions;
-- 数据流 3:写入点击次数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, click_cnt)
SELECT user_id, item_id, click_cnt
FROM `fluss_catalog`.fluss_db.clicks;上述三条 INSERT 语句可作为独立的 Flink 作业运行,Fluss 存储层会自动将相同主键的字段合并成完整的行。
Delta Join
功能说明
Delta Join 是 Fluss 提供的新一代双流 Join 范式,将数据更新行为下沉到 Fluss 存储层完成,利用主键表和二级索引实现高效查找,无需在 Flink 中维护 Join State。
核心优势:
无 Join State:省去冗余的状态存储,避免大状态导致的 GC 问题。
低资源消耗:实测 Flink 内存与 CPU 消耗下降超 86%。
更稳定高效:避免状态膨胀带来的性能瓶颈。
与传统 Join 对比
特性 | 传统 Flink Join(Kafka) | Fluss Delta Join |
状态存储 | 需在 Flink 缓存全量上游数据 | 无 Join State,依赖 Fluss 索引 |
资源消耗 | 高(大状态导致 GC 频繁) | 低 |
稳定性 | 易受状态膨胀影响 | 更稳定 |
实现机制 | 双边驱动更新 | 更新下沉至 Fluss 存储层 |
适用场景 | 通用流式 Join | Fluss 主键表间的高效 Join |
使用限制
左右表必须为 Fluss 的主键表(支持分区表)。
分桶键(Bucket Key)需为主键前缀。
Join Key 必须包含分桶键;若为分区表,还需包含分区键。
SQL 示例
1. 创建表
CREATE TABLE orders (
user_id BIGINT,
order_date DATE,
order_id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (user_id, order_id, order_date) NOT ENFORCED
) WITH ('bucket.key' = 'user_id, order_id');
CREATE TABLE order_enhance (
user_id BIGINT,
order_id BIGINT,
risk_level TINYINT,
is_fraud BOOLEAN,
update_time TIMESTAMP(3),
PRIMARY KEY (user_id, order_id) NOT ENFORCED
) WITH ('bucket.key' = 'user_id, order_id');2. 执行 Delta Join
SELECT * FROM orders o
INNER JOIN order_enhance e
ON o.user_id = e.user_id AND o.order_id = e.order_id;3. 通过 Hint 优化 Lookup 性能
SELECT * FROM orders o /*+ OPTIONS('client.lookup.queue-size'='2560') */
INNER JOIN order_enhance e /*+ OPTIONS('client.lookup.queue-size'='2560') */
ON o.user_id = e.user_id AND o.order_id = e.order_id;作业运行参数配置
使用 Delta Join 时,需在 Flink 作业的运行参数中添加以下配置:
table.exec.delta-join.cache-enabled: 'true'
table.exec.async-lookup.buffer-capacity: '3000'
table.optimizer.delta-join.strategy: FORCE
table.exec.delta-join.left.cache-size: '100'
table.exec.delta-join.right.cache-size: '1000'参数 | 说明 |
| 设为 |
| 异步请求缓冲容量,建议设置为千级别。 |
| 开启本地缓存以减少对 Fluss 的请求。 |
| 左表缓存 Key 数量,需根据内存和热点数据调整。 |
| 右表缓存 Key 数量,需根据内存和热点数据调整。 |
成功启用后,可在 Flink 作业状态总览中看到 Delta Join 节点,表明优化已生效。
Lookup Join(维表关联)
功能说明
Fluss 主键表支持作为 Flink SQL 的维表使用,通过 Lookup Join 可以在流处理中实时关联维度数据。Fluss 支持高效的 KV 点查和二级索引查找,适合低延迟的维表关联场景。
使用限制
仅支持主键表作为维表。
Join 条件必须包含维表的全部主键字段。
同步与异步模式
模式 | 参数配置 | 说明 |
异步(默认) |
| 高吞吐,推荐在大多数场景下使用。 |
同步 |
| 逐条查询,适合调试或低并发场景。 |
SQL 示例
-- 通过 Hint 配置 Lookup 缓存,进行维表关联
SELECT
o.order_id,
o.amount,
u.user_name,
p.product_name
FROM `fluss_catalog`.`fluss_db`.orders AS o
LEFT JOIN `fluss_catalog`.`fluss_db`.user_dim
/*+ OPTIONS('lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-write' = '60s', 'lookup.partial-cache.max-rows' = '10000') */
FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.user_id
LEFT JOIN `fluss_catalog`.`fluss_db`.product_dim
FOR SYSTEM_TIME AS OF o.proctime AS p
ON o.product_id = p.product_id;
-- 使用同步 Lookup 模式(适合调试场景)
SELECT o.order_id, o.amount, d.user_name
FROM `fluss_catalog`.`fluss_db`.orders AS o
LEFT JOIN `fluss_catalog`.`fluss_db`.user_dim
/*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF o.proctime AS d
ON o.user_id = d.user_id;Merge Engine
功能说明
Fluss 主键表支持配置不同的 Merge Engine,用于控制相同主键的多条记录如何合并。通过 table.merge-engine 参数设置,支持以下策略:
Merge Engine | 说明 |
| 保留最后一条写入的记录,后写入的数据覆盖先写入的数据。适用于大多数 Upsert 场景。 |
| 保留第一条写入的记录,后续相同主键的数据被忽略。适用于去重场景,如日志去重。 |
| 根据指定的版本列保留版本号最大的记录。需配合 |
SQL 示例
-- 使用 first_row 实现去重
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.event_dedup (
event_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
payload STRING,
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'first_row'
);
-- 使用 versioned 按版本列更新
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.user_profile (
user_id BIGINT,
user_name STRING,
age INT,
update_version BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'versioned',
'table.merge-engine.versioned.ver-column' = 'update_version'
);分区表
功能说明
Fluss 支持分区表,通过 PARTITIONED BY 将数据按分区键划分到不同的物理分区中,适用于按时间或业务维度管理数据生命周期和提升查询性能的场景。分区表支持单字段分区和多字段分区,并提供手动管理、自动管理和动态创建三种正交的分区管理策略。
基本规则
分区键的数据类型需要是 STRING。
主键表的分区键需要是主键的子集。
自动分区规则仅在建表时配置,建表后不可修改。
分区管理策略
策略 | 说明 |
手动管理 | 用户通过 |
自动管理 | 基于时间规则自动创建新分区并清理过期历史分区,适合按时间维度持续写入的场景。 |
动态创建 | 写入数据时,若目标分区不存在则由客户端自动创建,适合分区值不可预知的场景。 |
三种策略是正交的,可以组合使用。例如,可以同时开启自动管理(按时间自动创建和清理)和动态创建(兜底处理未预创建的分区)。
SQL 示例
1. 创建单字段分区表(自动分区管理)
CREATE TABLE `fluss_catalog`.fluss_db.site_access (
site_id BIGINT,
user_name STRING,
pv BIGINT,
event_day STRING,
PRIMARY KEY (site_id, event_day) NOT ENFORCED
) PARTITIONED BY (event_day) WITH (
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'DAY',
'table.auto-partition.num-precreate' = '5',
'table.auto-partition.num-retention' = '30',
'table.auto-partition.time-zone' = 'Asia/Shanghai'
);2. 创建多字段分区表
CREATE TABLE `fluss_catalog`.fluss_db.region_access (
access_id BIGINT,
user_name STRING,
event_date STRING,
region STRING,
PRIMARY KEY (access_id, event_date, region) NOT ENFORCED
) PARTITIONED BY (event_date, region) WITH (
'table.auto-partition.enabled' = 'true',
'table.auto-partition.key' = 'event_date',
'table.auto-partition.time-unit' = 'DAY',
'table.auto-partition.num-retention' = '7'
);说明:多字段分区时,需通过table.auto-partition.key指定用于自动分区的时间键。多字段分区仅支持自动过期清理,不支持自动预创建(num-precreate强制为 0)。
3. 手动管理分区
-- 查看所有分区
SHOW PARTITIONS `fluss_catalog`.fluss_db.site_access;
-- 手动添加分区
ALTER TABLE `fluss_catalog`.fluss_db.site_access
ADD PARTITION (event_day = '20250315');
-- 手动删除分区
ALTER TABLE `fluss_catalog`.fluss_db.site_access
DROP PARTITION (event_day = '20250101');4. 动态创建分区
写入数据时,如果目标分区不存在,客户端会自动创建对应分区(默认开启):
-- 写入数据,若分区 '20250701' 不存在则自动创建
INSERT INTO `fluss_catalog`.fluss_db.site_access
VALUES (1, 'hello', 100, '20250701');动态创建由客户端参数 client.writer.dynamic-create-partition.enabled(默认 true)控制,受集群级别的 max.partition.num(默认 1000)和 max.bucket.num(默认 128000)限制。
分区下推
流式或批式查询分区表时,Fluss 支持分区下推(分区裁剪),通过 WHERE 条件中的分区键过滤,仅读取匹配的分区数据,减少不必要的 I/O 开销:
-- 仅读取指定日期的分区数据
SELECT * FROM `fluss_catalog`.fluss_db.site_access
WHERE event_day = '20250315';
-- 多字段分区表按 region 过滤
SELECT * FROM `fluss_catalog`.fluss_db.region_access
WHERE region = 'US';自动分区参数说明
参数 | 类型 | 默认值 | 说明 |
| Boolean | false | 是否启用自动分区管理。 |
| String | 无 | 多分区键时,指定用于自动分区的时间键。单分区键时无需配置。 |
| Enum | DAY | 自动分区的时间粒度,可选 |
| Integer | 2 | 预创建的未来分区数量。多字段分区时不支持,强制为 0。 |
| Integer | 7 | 保留的历史分区数量,超出的过期分区将被自动删除。 |
| String | 系统时区 | 自动分区使用的时区。 |
最佳实践
选择合适的分区粒度:根据数据量和查询模式选择时间粒度。数据量大、查询时间范围小的场景建议按
DAY或HOUR;数据量较小的场景可按MONTH或QUARTER。合理设置保留策略:通过
num-retention控制历史分区数量,避免分区数过多导致元数据膨胀和集群管理负担增大。组合使用自动管理与动态创建:自动管理负责常规的时间分区创建和清理,动态创建作为兜底机制处理异常或延迟数据。
注意集群限制:单表分区数受
max.partition.num限制(默认 1000),总 Bucket 数受max.bucket.num限制(默认 128000),分区数 xbucket.num不应超过此上限。
流式查询下推
Fluss 采用 Apache Arrow 列式存储格式进行日志传输,支持将过滤和投影操作下推到存储层执行,从而减少数据传输量,查询性能可提升高达 10 倍。
这一特性在以下场景中尤为有效:
只需读取宽表中部分列(投影下推)。
需要根据条件过滤大量数据(谓词下推)。
日志存储格式通过 table.log.format 参数控制,默认为 ARROW。
湖流一体
功能说明
Fluss 支持湖流一体架构,可将 Fluss 中的实时数据自动同步到数据湖(如 Apache Paimon),实现冷热数据自动分层存储,兼顾实时查询的低延迟与历史数据的低存储成本。
关键特性:
自动将 Fluss 中的流式数据写入 Paimon 湖存储。
冷热数据自动分层,热数据保留在 Fluss 流存储中,冷数据沉降到 Paimon。
通过统一的 Flink SQL 查询接口同时访问流存储和湖存储中的数据。
配置参数
参数 | 类型 | 默认值 | 说明 |
| Boolean | false | 开启湖存储分层。 |
| Enum | 集群默认值 | 数据湖格式,当前仅支持 |
| Integer | 2 | 开启分层存储后,本地保留的日志段数量。 |
数据探查
功能说明
Fluss 支持在 Flink SQL 的批模式下对表数据进行快速探查,包括 LIMIT 查询、主键点查和 COUNT(*) 统计,无需启动持续运行的流式作业即可快速查看表中的数据。这对于数据开发调试、数据验证和问题排查非常实用。
LIMIT 查询
支持对主键表和日志表执行 LIMIT 查询,快速获取表中的少量样本数据。
-- 从主键表中查看前 10 条数据
SELECT * FROM `fluss_catalog`.`fluss_db`.orders LIMIT 10;
-- 从日志表中查看前 5 条数据
SELECT * FROM `fluss_catalog`.`fluss_db`.events LIMIT 5;
-- 结合投影,仅查看部分列
SELECT order_id, amount, order_time
FROM `fluss_catalog`.`fluss_db`.orders LIMIT 20;主键点查
对于主键表,支持通过 WHERE 条件按主键进行精确点查,利用 Fluss 的 KV 存储引擎实现毫秒级响应。
-- 按主键精确查询单条记录
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
WHERE order_id = 1001;
-- 复合主键的点查
SELECT * FROM `fluss_catalog`.`fluss_db`.user_orders
WHERE user_id = 12345 AND order_id = 6789;COUNT(*) 统计
支持对主键表执行 COUNT(*) 查询,快速获取表中的总记录数。
-- 统计主键表中的总行数
SELECT COUNT(*) FROM `fluss_catalog`.`fluss_db`.orders;WITH 参数
存储参数
以下参数会持久化到表的元数据中。
参数 | 类型 | 默认值 | 是否必填 | 说明 |
| Integer | 无 | 否 | Fluss 表的 Bucket 数量。影响数据并行读写的并发度。默认值采用集群的default.bucket.number |
| String | 无 | 否 | 用于数据哈希分布的字段,必须是主键的子集,多个字段用逗号分隔。 |
| Duration | 7d | 否 | 日志数据的保留时间。 |
| Integer | 集群默认值 | 否 | 日志表副本数,不能超过 tabletServer 数量。 |
| Enum | ARROW | 否 | 日志存储格式,可选 |
| Enum | ZSTD | 否 | ARROW 格式的压缩类型,可选 |
| Integer | 3 | 否 | ZSTD 压缩级别(1-22)。 |
| Enum | COMPACTED | 否 | KV 存储格式,可选 |
| Enum | last_row | 否 | 主键表的合并策略,可选 |
| String | 无 | 条件必填 |
|
| Boolean | false | 否 | 是否开启湖存储分层。 |
| Enum | 集群默认值 | 否 | 数据湖格式,当前仅支持 |
| Integer | 2 | 否 | 分层存储开启后,本地保留的日志段数量。 |
自动分区参数
参数 | 类型 | 默认值 | 是否必填 | 说明 |
| Boolean | false | 否 | 是否启用自动分区创建。 |
| Enum | DAY | 否 | 自动分区的时间粒度,可选 |
| Integer | 2 | 否 | 检查时预创建的未来分区数量。 |
| Integer | 7 | 否 | 保留的历史分区数量,超出的分区将被自动删除。 |
| String | 系统时区 | 否 | 自动分区使用的时区。 |
读取参数(源表)
以下参数可通过 WITH 子句或 SQL Hint 配置。
参数 | 类型 | 默认值 | 是否必填 | 说明 |
| Enum | full | 否 | 消费起始模式,可选 |
| Long/String | 无 | 条件必填 | 起始时间戳(毫秒值或 |
| Duration | 1分钟 | 否 | 自动发现新分区的间隔时间。设置为负值可禁用自动发现。 |
| Boolean | true | 否 | 是否对消息进行 CRC32 校验以验证数据完整性。 |
| Integer | 500 | 否 | 每次 |
| MemorySize | 16mb | 否 | 每次请求从服务端拉取的最大字节数。 |
| MemorySize | 1mb | 否 | 每个 Bucket 每次请求拉取的最大字节数。 |
| MemorySize | 1b | 否 | 请求响应前期望的最小字节数。 |
| Duration | 500ms | 否 | 当未满足最小字节数时,服务端最大等待时间。 |
| String | 系统临时目录 | 否 | 用于存放快照和日志段等临时文件的本地目录。 |
| Integer | 4 | 否 | 远程日志段预拉取数量。 |
| Integer | 3 | 否 | 下载远程文件的线程数。 |
写入参数(结果表)
以下参数可通过 WITH 子句或 SQL Hint 配置。
参数 | 类型 | 默认值 | 是否必填 | 说明 |
| Boolean | false | 否 | 是否忽略 |
| Boolean | true | 否 | 写入前是否按 Bucket ID 进行数据 Shuffle,以提升写入效率。 |
| MemorySize | 64mb | 否 | 内部缓存行数据的总内存大小。 |
| MemorySize | 2mb | 否 | 同一 Bucket 内记录的目标批次大小。 |
| Duration | Long.MAX | 否 | Writer 等待可用 Segment 的最大阻塞时间。 |
| Duration | 100ms | 否 | 积攒记录后发送批次的最大等待时间。 |
| Enum | STICKY | 否 | 无主键表的 Bucket 分配策略,可选 |
| String | all | 否 | 写入确认级别。 |
| MemorySize | 10mb | 否 | 单个请求的最大字节数。 |
| Integer | MAX_VALUE | 否 | 因可重试错误导致发送失败时的重试次数。 |
| Boolean | true | 否 | 是否开启幂等写入,保证 Exactly-Once 语义和顺序性。 |
| Integer | 5 | 否 | 每个 Bucket 的最大未确认请求数(幂等写入开启时生效)。 |
维表参数
参数 | 类型 | 默认值 | 是否必填 | 说明 |
| Boolean | true | 否 | 是否启用异步 Lookup,提升维表关联吞吐量。 |
| Enum | NONE | 否 | 缓存策略,可选 |
| Integer | 3 | 否 | Lookup 失败时的最大重试次数。 |
| Duration | 无 | 否 | 访问后的缓存过期时间。 |
| Duration | 无 | 否 | 写入后的缓存过期时间。 |
| Boolean | true | 否 | 是否缓存未命中的 Key(缓存 null 值)。 |
| Long | 无 | 否 | 缓存的最大行数。 |
| Integer | 25600 | 否 | 待处理的 Lookup 操作队列大小。 |
| Integer | 128 | 否 | 合并多个 Lookup 操作的最大批次大小。 |
| Integer | 128 | 否 | 最大未确认 Lookup 请求数。 |
| Duration | 100ms | 否 | 填满 Lookup 批次前的最大等待时间。 |
数据摄入
Fluss连接器作为数据下游可以在数据摄入YAML作业中使用。
语法结构
source:
type: xxx
sink:
type: fluss
name: Fluss Sink
bootstrap.servers: localhost:9123
properties.client.security.protocol: sasl
properties.client.security.sasl.mechanism: PLAIN
properties.client.security.sasl.username: developer
properties.client.security.sasl.password: developer-pass
pipeline:
schema.change.behavior: IGNORE表结构变更
Fluss Server支持追加新增列,不支持其他列类型操作(比如删除列、重命名列和变更列类型)。因此,建议搭配Flink CDC默认的schema.change.behavior = LENIENT使用:
列重命名时,改为发送更改列类型和新增列两个事件。原有的列不删除,更改列类型为nullable,同时新增一个列名为新名称、数据类型改为nullable的列。
删除列时,改为发送更改列类型事件,将对应字段类型变为nullable。
新增列时仍发送新增列事件,但字段类型会变为nullable。
配置项
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
type | sink类型。 | 是 | STRING | 无 | 固定值为fluss。 |
name | sink名称。 | 否 | STRING | 无 | 无。 |
bootstrap.servers | Fluss Server地址 | 是 | STRING | 无 | 格式为host:port,host:port,host:port,以英文逗号(,)分隔。 |
bucket.key | 分桶键 | 否 | String | 无 | 指定每个 Fluss 表的数据分布策略。表之间用 格式: 数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。 若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。 bucket.key和bucket.num值在创建表后就不再改变。 |
bucket.num | 配置 fluss 表的桶数量 | 否 | String | 无 | 每个 Fluss 表的桶数量。表之间用 格式: 如果一张表未配置桶数量,默认会采用Server端的配置项 bucket.key和bucket.num值在创建表后就不再改变。 |
properties.table.* | fluss 表属性 | 否 | String | 无 | 将 Fluss table 支持的参数传递给 pipeline,详情请参见Fluss table options。 |
properties.client.* | fluss client参数 | 否 | String | 无 | 将 Fluss client 支持的参数传递给 pipeline,详情请参见Fluss client options。 |
类型映射
数据摄入类型映射如下表所示。
CDC字段类型 | Fluss 字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMP_LTZ |
CHAR(n) | CHAR(n) |
VARCHAR(n) | STRING |
ARRAY | ARRAY |
MAP | MAP |
ROW | ROW |
BINARY(n) | 暂不支持 |
VARBINARY(N) |