流存储Fluss(公测中)

更新时间:
复制为 MD 格式

本文为您介绍如何使用流存储Fluss连接器。

背景信息

阿里云流存储Fluss采用“湖流一体”架构,实现与Apache Paimon等数据湖格式的无缝对接,在确保数据高时效性的基础上,显著降低实时数仓的构建与运维成本。通过统一的数据流与湖仓协同机制,Fluss助力企业打破数据孤岛,加速湖上数据价值的深度挖掘与高效共享。

类别

详情

支持类型

源表、维表、结果表,数据摄入目标端

运行模式

流模式和批模式

数据格式

暂不支持

API种类

SQL和数据摄入YAML

是否支持更新或删除结果表数据

SQL

Fluss连接器可以在SQL作业中使用,作为源表,维表或者结果表。详情请参见:引擎对接 - Flink

前提条件

  • 当前用户对Fluss表对目标表有读写权限,参考授权访问Fluss集群

  • 连接阿里云流存储Fluss集群时,Flink工作空间和Fluss集群需要在同一VPC内。

  • 已在 Flink 控制台创建Fluss Catalog

特色功能

Fluss Connector 提供以下特色功能:

功能

介绍

全增量一体化消费

支持全量、增量及全增量一体化消费。

数据探查

Fluss 支持在 Flink SQL 的批模式下对表数据进行快速探查,这对于数据开发调试、数据验证和问题排查非常实用。

CDC 日志订阅与虚拟表

Fluss 主键表天然支持 CDC(Change Data Capture)日志订阅,下游 Flink 作业可以实时消费到完整的变更事件流(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)。

Lookup Join(维表关联)

Fluss 支持高效的 KV 点查和二级索引查找,适合低延迟的维表关联场景。

部分列更新(Partial Update)

只更新修改部分列的数据,而非整行更新。

Delta Join

新型的 Join 方案,能够在保持双流 Join 语义的同时,将数据更新行为下沉到Fluss表中完成,从而显著降低Flink的资源消耗,提升作业的稳定性和执行效率。

Merge Engine

Fluss 主键表支持配置不同的 Merge Engine,用于控制相同主键的多条记录如何合并。

湖流一体

将实时流式存储(Fluss)与基于 Paimon 的数据湖(由 DLF 统一管理元数据)进行深度集成,实现“一份数据,两种视图”,即在同一个逻辑表中,同时提供低延迟的实时数据访问和高吞吐的历史数据分析能力。

分区表

支持分区表创建、读写数据,并可自动创建和清理分区。

新增列

支持动态新增列,同时不影响运行中的读写作业。

使用限制

  • 日志表(Log Table)只能接收 INSERT 消息,无法处理 UPDATE 和 DELETE 操作。

  • 主键表(PrimaryKey Table)可以处理所有类型的消息(INSERT、UPDATE、DELETE)。

  • 维表 Lookup Join 仅支持主键表,Join 条件需包含全部主键字段。

  • Delta Join 要求左右表均为 Fluss 主键表,且 Join Key 必须包含分桶键(Bucket Key)。

注意事项

  • Fluss 表有两种类型:日志表(无主键,类似 Kafka Topic)和主键表(有主键,支持 Upsert 语义)。请根据业务场景选择合适的表类型。

  • 生产环境应合理配置 bucket.num,以保证数据的并行读写性能和数据分布均衡。

  • 对于分区表,建议合理规划分区策略:根据数据量和查询模式选择合适的分区时间粒度(如按天、按月),并通过 table.auto-partition.num-retention 控制历史分区保留数量,避免分区数过多导致元数据膨胀。同时,对于主键表,如果数据本身具有时间或业务维度的天然分区特征,建议优先使用分区表而非将所有数据堆积在单个非分区表中。分区表不仅便于数据生命周期管理,还能在流式和批式查询中利用分区下推(分区裁剪)大幅减少不必要的数据扫描,显著提升查询性能。

  • 使用 Delta Join 前,需在 Flink 作业的运行参数中配置相应参数以启用优化。

特色功能详解

全增量一体化消费

功能说明

Fluss 主键表支持全增量一体化消费:先读取表中的历史全量快照数据,全量数据读取完毕后自动无缝切换到增量流式消费,保证数据的完整性和一致性。

消费模式

通过 scan.startup.mode 参数控制消费起始位点:

模式

说明

full

默认值。先读取全量快照,再增量消费。适合需要完整历史数据的场景。

earliest

从最早的日志位点开始消费,不读取全量快照。

latest

从最新位点开始消费,仅消费启动后产生的新数据。

timestamp

从指定时间戳位点开始消费,需配合 scan.startup.timestamp 参数使用。

SQL 示例
-- 全增量一体化消费(默认模式)
SELECT * FROM `fluss_catalog`.`fluss_db`.orders;

-- 从指定时间戳开始消费,通过 Hint 指定参数
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
  /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '2025-01-01 00:00:00') */;

-- 从最新位点开始消费
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
  /*+ OPTIONS('scan.startup.mode' = 'latest') */;

CDC 日志订阅与虚拟表

日志订阅

Fluss 主键表天然支持 CDC(Change Data Capture)日志订阅,下游 Flink 作业可以实时消费到完整的变更事件流(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE),无需额外开启 Binlog 配置。

这意味着:

  • 主键表的所有变更都会自动产生 CDC 日志。

  • 下游可直接订阅 Fluss 主键表获取完整的变更流,用于实时数据同步或流式 ETL。

SQL 示例
-- 直接读取主键表即可获取 CDC 变更流
SELECT * FROM `fluss_catalog`.`fluss_db`.orders;
-- 输出包含完整的 INSERT / UPDATE / DELETE 事件
虚拟表(Virtual Table)

除了直接订阅 CDC 流之外,Fluss 还提供了虚拟表机制,通过在基础表名后追加特定后缀即可访问带有元数据的变更日志,无需额外存储数据。虚拟表是系统自动生成的,不需要用户手动创建。

Changelog 表($changelog

适用于主键表和日志表,提供带有元数据的原始变更日志流。

表结构(在原始表字段之外,自动附加以下元数据列):

元数据列

类型

说明

_change_type

STRING

变更类型。主键表可能的值:insertupdate_beforeupdate_afterdelete;日志表仅有 insert

_log_offset

BIGINT

该条变更在日志中的偏移量。

_commit_timestamp

TIMESTAMP_LTZ

变更的提交时间戳。

SQL 示例

-- 读取 orders 表的 changelog 虚拟表
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog;

-- 通过 Hint 指定从最新位点开始消费
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog
  /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- 过滤出删除事件
SELECT * FROM `fluss_catalog`.`fluss_db`.orders$changelog
WHERE _change_type = 'delete';
Binlog 表($binlog

仅适用于主键表,提供包含变更前后镜像(before/after image)的 Binlog 格式数据,便于精确获取每次变更的前后状态。

表结构(在元数据列之外,以嵌套 ROW 类型呈现变更前后的完整行数据):

类型

说明

_change_type

STRING

变更类型:insertupdatedelete

_log_offset

BIGINT

日志偏移量。

_commit_timestamp

TIMESTAMP_LTZ

提交时间戳。

before

ROW

变更前的行数据。insert 时为 NULL。

after

ROW

变更后的行数据。delete 时为 NULL。

变更事件对照

变更类型

before

after

insert

NULL

新值

update

旧值

新值

delete

旧值

NULL

SQL 示例

-- 读取 users 表的 binlog 虚拟表
SELECT * FROM `fluss_catalog`.`fluss_db`.users$binlog;

-- 获取更新事件中字段变更前后的值
SELECT
  `before`.user_name AS old_name,
  `after`.user_name  AS new_name,
  _commit_timestamp  AS change_time
FROM `fluss_catalog`.`fluss_db`.users$binlog
WHERE _change_type = 'update';
使用限制
  • Changelog 表和 Binlog 表目前不支持投影下推、分区下推和谓词下推

  • Binlog 表仅适用于主键表,日志表不支持。

  • 虚拟表的启动模式支持 earliest(默认)、latesttimestamp,不支持 full


部分列更新(Partial Update)

功能说明

Fluss 支持部分列更新(Partial Update),允许多个数据流独立地向同一张主键宽表写入各自关心的字段,Fluss 引擎会自动按主键进行字段级别的合并。这种方式无需在 Flink 中维护复杂的多流 Join 状态,避免了大状态问题。

使用场景

典型场景为实时用户画像宽表构建:不同的数据源(如推荐系统、曝光日志、点击日志、购物车数据)分别更新用户画像表中各自对应的字段,最终形成一张完整的用户 360 度视图。

使用限制
  • 目标表必须为主键表

  • 每个数据流的写入字段必须包含完整的主键字段

  • 计算拓扑中不包含多流 Join 节点,直接由 Fluss 在存储层完成 Merge。

SQL 示例
-- 1. 创建宽表
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.user_rec_wide (
    user_id   STRING,
    item_id   STRING,
    rec_score DOUBLE,
    imp_cnt   INT,
    click_cnt INT,
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3');

-- 2. 多流分别写入各自字段
-- 数据流 1:写入推荐分数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, rec_score)
SELECT user_id, item_id, rec_score 
FROM `fluss_catalog`.fluss_db.recommendations;

-- 数据流 2:写入曝光次数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, imp_cnt)
SELECT user_id, item_id, imp_cnt 
FROM `fluss_catalog`.fluss_db.impressions;

-- 数据流 3:写入点击次数
INSERT INTO `fluss_catalog`.fluss_db.user_rec_wide (user_id, item_id, click_cnt)
SELECT user_id, item_id, click_cnt 
FROM `fluss_catalog`.fluss_db.clicks;

上述三条 INSERT 语句可作为独立的 Flink 作业运行,Fluss 存储层会自动将相同主键的字段合并成完整的行。


Delta Join

功能说明

Delta Join 是 Fluss 提供的新一代双流 Join 范式,将数据更新行为下沉到 Fluss 存储层完成,利用主键表和二级索引实现高效查找,无需在 Flink 中维护 Join State

核心优势:

  • 无 Join State:省去冗余的状态存储,避免大状态导致的 GC 问题。

  • 低资源消耗:实测 Flink 内存与 CPU 消耗下降超 86%。

  • 更稳定高效:避免状态膨胀带来的性能瓶颈。

与传统 Join 对比

特性

传统 Flink Join(Kafka)

Fluss Delta Join

状态存储

需在 Flink 缓存全量上游数据

无 Join State,依赖 Fluss 索引

资源消耗

高(大状态导致 GC 频繁)

稳定性

易受状态膨胀影响

更稳定

实现机制

双边驱动更新

更新下沉至 Fluss 存储层

适用场景

通用流式 Join

Fluss 主键表间的高效 Join

使用限制
  • 左右表必须为 Fluss 的主键表(支持分区表)。

  • 分桶键(Bucket Key)需为主键前缀。

  • Join Key 必须包含分桶键;若为分区表,还需包含分区键。

SQL 示例

1. 创建表

CREATE TABLE orders (
  user_id BIGINT,
  order_date DATE,
  order_id BIGINT,
  amount DECIMAL(10, 2),
  PRIMARY KEY (user_id, order_id, order_date) NOT ENFORCED
) WITH ('bucket.key' = 'user_id, order_id');

CREATE TABLE order_enhance (
  user_id BIGINT,
  order_id BIGINT,
  risk_level TINYINT,
  is_fraud BOOLEAN,
  update_time TIMESTAMP(3),
  PRIMARY KEY (user_id, order_id) NOT ENFORCED
) WITH ('bucket.key' = 'user_id, order_id');

2. 执行 Delta Join

SELECT * FROM orders o
INNER JOIN order_enhance e
ON o.user_id = e.user_id AND o.order_id = e.order_id;

3. 通过 Hint 优化 Lookup 性能

SELECT * FROM orders o /*+ OPTIONS('client.lookup.queue-size'='2560') */
INNER JOIN order_enhance e /*+ OPTIONS('client.lookup.queue-size'='2560') */
ON o.user_id = e.user_id AND o.order_id = e.order_id;
作业运行参数配置

使用 Delta Join 时,需在 Flink 作业的运行参数中添加以下配置:

table.exec.delta-join.cache-enabled: 'true'
table.exec.async-lookup.buffer-capacity: '3000'
table.optimizer.delta-join.strategy: FORCE
table.exec.delta-join.left.cache-size: '100'
table.exec.delta-join.right.cache-size: '1000'

参数

说明

table.optimizer.delta-join.strategy

设为 FORCE 强制启用 Delta Join,无法满足条件时报错。

table.exec.async-lookup.buffer-capacity

异步请求缓冲容量,建议设置为千级别。

table.exec.delta-join.cache-enabled

开启本地缓存以减少对 Fluss 的请求。

table.exec.delta-join.left.cache-size

左表缓存 Key 数量,需根据内存和热点数据调整。

table.exec.delta-join.right.cache-size

右表缓存 Key 数量,需根据内存和热点数据调整。

成功启用后,可在 Flink 作业状态总览中看到 Delta Join 节点,表明优化已生效。


Lookup Join(维表关联)

功能说明

Fluss 主键表支持作为 Flink SQL 的维表使用,通过 Lookup Join 可以在流处理中实时关联维度数据。Fluss 支持高效的 KV 点查和二级索引查找,适合低延迟的维表关联场景。

使用限制
  • 仅支持主键表作为维表。

  • Join 条件必须包含维表的全部主键字段

同步与异步模式

模式

参数配置

说明

异步(默认)

'lookup.async' = 'true'

高吞吐,推荐在大多数场景下使用。

同步

'lookup.async' = 'false'

逐条查询,适合调试或低并发场景。

SQL 示例
-- 通过 Hint 配置 Lookup 缓存,进行维表关联
SELECT
  o.order_id,
  o.amount,
  u.user_name,
  p.product_name
FROM `fluss_catalog`.`fluss_db`.orders AS o
LEFT JOIN `fluss_catalog`.`fluss_db`.user_dim
  /*+ OPTIONS('lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-write' = '60s', 'lookup.partial-cache.max-rows' = '10000') */
  FOR SYSTEM_TIME AS OF o.proctime AS u
  ON o.user_id = u.user_id
LEFT JOIN `fluss_catalog`.`fluss_db`.product_dim
  FOR SYSTEM_TIME AS OF o.proctime AS p
  ON o.product_id = p.product_id;

-- 使用同步 Lookup 模式(适合调试场景)
SELECT o.order_id, o.amount, d.user_name
FROM `fluss_catalog`.`fluss_db`.orders AS o
LEFT JOIN `fluss_catalog`.`fluss_db`.user_dim
  /*+ OPTIONS('lookup.async' = 'false') */
  FOR SYSTEM_TIME AS OF o.proctime AS d
  ON o.user_id = d.user_id;

Merge Engine

功能说明

Fluss 主键表支持配置不同的 Merge Engine,用于控制相同主键的多条记录如何合并。通过 table.merge-engine 参数设置,支持以下策略:

Merge Engine

说明

last_row(默认)

保留最后一条写入的记录,后写入的数据覆盖先写入的数据。适用于大多数 Upsert 场景。

first_row

保留第一条写入的记录,后续相同主键的数据被忽略。适用于去重场景,如日志去重。

versioned

根据指定的版本列保留版本号最大的记录。需配合 table.merge-engine.versioned.ver-column 参数指定版本列。适用于需要按版本控制数据更新顺序的场景。

SQL 示例
-- 使用 first_row 实现去重
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.event_dedup (
    event_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    payload STRING,
    PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
  'table.merge-engine' = 'first_row'
);

-- 使用 versioned 按版本列更新
CREATE TABLE IF NOT EXISTS `fluss_catalog`.fluss_db.user_profile (
    user_id BIGINT,
    user_name STRING,
    age INT,
    update_version BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'table.merge-engine' = 'versioned',
  'table.merge-engine.versioned.ver-column' = 'update_version'
);

分区表

功能说明

Fluss 支持分区表,通过 PARTITIONED BY 将数据按分区键划分到不同的物理分区中,适用于按时间或业务维度管理数据生命周期和提升查询性能的场景。分区表支持单字段分区多字段分区,并提供手动管理、自动管理和动态创建三种正交的分区管理策略。

基本规则
  • 分区键的数据类型需要是 STRING

  • 主键表的分区键需要是主键的子集

  • 自动分区规则仅在建表时配置,建表后不可修改

分区管理策略

策略

说明

手动管理

用户通过 ALTER TABLE 手动创建或删除分区,适合分区数量有限且可控的场景。

自动管理

基于时间规则自动创建新分区并清理过期历史分区,适合按时间维度持续写入的场景。

动态创建

写入数据时,若目标分区不存在则由客户端自动创建,适合分区值不可预知的场景。

三种策略是正交的,可以组合使用。例如,可以同时开启自动管理(按时间自动创建和清理)和动态创建(兜底处理未预创建的分区)。

SQL 示例

1. 创建单字段分区表(自动分区管理)

CREATE TABLE `fluss_catalog`.fluss_db.site_access (
  site_id BIGINT,
  user_name STRING,
  pv BIGINT,
  event_day STRING,
  PRIMARY KEY (site_id, event_day) NOT ENFORCED
) PARTITIONED BY (event_day) WITH (
  'table.auto-partition.enabled' = 'true',
  'table.auto-partition.time-unit' = 'DAY',
  'table.auto-partition.num-precreate' = '5',
  'table.auto-partition.num-retention' = '30',
  'table.auto-partition.time-zone' = 'Asia/Shanghai'
);

2. 创建多字段分区表

CREATE TABLE `fluss_catalog`.fluss_db.region_access (
  access_id BIGINT,
  user_name STRING,
  event_date STRING,
  region STRING,
  PRIMARY KEY (access_id, event_date, region) NOT ENFORCED
) PARTITIONED BY (event_date, region) WITH (
  'table.auto-partition.enabled' = 'true',
  'table.auto-partition.key' = 'event_date',
  'table.auto-partition.time-unit' = 'DAY',
  'table.auto-partition.num-retention' = '7'
);
说明:多字段分区时,需通过 table.auto-partition.key 指定用于自动分区的时间键。多字段分区仅支持自动过期清理,不支持自动预创建(num-precreate 强制为 0)。

3. 手动管理分区

-- 查看所有分区
SHOW PARTITIONS `fluss_catalog`.fluss_db.site_access;

-- 手动添加分区
ALTER TABLE `fluss_catalog`.fluss_db.site_access
  ADD PARTITION (event_day = '20250315');

-- 手动删除分区
ALTER TABLE `fluss_catalog`.fluss_db.site_access
  DROP PARTITION (event_day = '20250101');

4. 动态创建分区

写入数据时,如果目标分区不存在,客户端会自动创建对应分区(默认开启):

-- 写入数据,若分区 '20250701' 不存在则自动创建
INSERT INTO `fluss_catalog`.fluss_db.site_access
VALUES (1, 'hello', 100, '20250701');

动态创建由客户端参数 client.writer.dynamic-create-partition.enabled(默认 true)控制,受集群级别的 max.partition.num(默认 1000)和 max.bucket.num(默认 128000)限制。

分区下推

流式或批式查询分区表时,Fluss 支持分区下推(分区裁剪),通过 WHERE 条件中的分区键过滤,仅读取匹配的分区数据,减少不必要的 I/O 开销:

-- 仅读取指定日期的分区数据
SELECT * FROM `fluss_catalog`.fluss_db.site_access
WHERE event_day = '20250315';

-- 多字段分区表按 region 过滤
SELECT * FROM `fluss_catalog`.fluss_db.region_access
WHERE region = 'US';
自动分区参数说明

参数

类型

默认值

说明

table.auto-partition.enabled

Boolean

false

是否启用自动分区管理。

table.auto-partition.key

String

多分区键时,指定用于自动分区的时间键。单分区键时无需配置。

table.auto-partition.time-unit

Enum

DAY

自动分区的时间粒度,可选 HOURDAYMONTHQUARTERYEAR

table.auto-partition.num-precreate

Integer

2

预创建的未来分区数量。多字段分区时不支持,强制为 0。

table.auto-partition.num-retention

Integer

7

保留的历史分区数量,超出的过期分区将被自动删除。

table.auto-partition.time-zone

String

系统时区

自动分区使用的时区。

最佳实践
  • 选择合适的分区粒度:根据数据量和查询模式选择时间粒度。数据量大、查询时间范围小的场景建议按 DAYHOUR;数据量较小的场景可按 MONTHQUARTER

  • 合理设置保留策略:通过 num-retention 控制历史分区数量,避免分区数过多导致元数据膨胀和集群管理负担增大。

  • 组合使用自动管理与动态创建:自动管理负责常规的时间分区创建和清理,动态创建作为兜底机制处理异常或延迟数据。

  • 注意集群限制:单表分区数受 max.partition.num 限制(默认 1000),总 Bucket 数受 max.bucket.num 限制(默认 128000),分区数 x bucket.num 不应超过此上限。


流式查询下推

Fluss 采用 Apache Arrow 列式存储格式进行日志传输,支持将过滤和投影操作下推到存储层执行,从而减少数据传输量,查询性能可提升高达 10 倍

这一特性在以下场景中尤为有效:

  • 只需读取宽表中部分列(投影下推)。

  • 需要根据条件过滤大量数据(谓词下推)。

日志存储格式通过 table.log.format 参数控制,默认为 ARROW


湖流一体

功能说明

Fluss 支持湖流一体架构,可将 Fluss 中的实时数据自动同步到数据湖(如 Apache Paimon),实现冷热数据自动分层存储,兼顾实时查询的低延迟与历史数据的低存储成本。

关键特性:

  • 自动将 Fluss 中的流式数据写入 Paimon 湖存储。

  • 冷热数据自动分层,热数据保留在 Fluss 流存储中,冷数据沉降到 Paimon。

  • 通过统一的 Flink SQL 查询接口同时访问流存储和湖存储中的数据。

配置参数

参数

类型

默认值

说明

table.datalake.enabled

Boolean

false

开启湖存储分层。

table.datalake.format

Enum

集群默认值

数据湖格式,当前仅支持 paimon

table.log.tiered.local-segments

Integer

2

开启分层存储后,本地保留的日志段数量。


数据探查

功能说明

Fluss 支持在 Flink SQL 的批模式下对表数据进行快速探查,包括 LIMIT 查询主键点查COUNT(*) 统计,无需启动持续运行的流式作业即可快速查看表中的数据。这对于数据开发调试、数据验证和问题排查非常实用。

LIMIT 查询

支持对主键表和日志表执行 LIMIT 查询,快速获取表中的少量样本数据。

-- 从主键表中查看前 10 条数据
SELECT * FROM `fluss_catalog`.`fluss_db`.orders LIMIT 10;

-- 从日志表中查看前 5 条数据
SELECT * FROM `fluss_catalog`.`fluss_db`.events LIMIT 5;

-- 结合投影,仅查看部分列
SELECT order_id, amount, order_time 
FROM `fluss_catalog`.`fluss_db`.orders LIMIT 20;
主键点查

对于主键表,支持通过 WHERE 条件按主键进行精确点查,利用 Fluss 的 KV 存储引擎实现毫秒级响应。

-- 按主键精确查询单条记录
SELECT * FROM `fluss_catalog`.`fluss_db`.orders
WHERE order_id = 1001;

-- 复合主键的点查
SELECT * FROM `fluss_catalog`.`fluss_db`.user_orders
WHERE user_id = 12345 AND order_id = 6789;
COUNT(*) 统计

支持对主键表执行 COUNT(*) 查询,快速获取表中的总记录数。

-- 统计主键表中的总行数
SELECT COUNT(*) FROM `fluss_catalog`.`fluss_db`.orders;

WITH 参数

存储参数

以下参数会持久化到表的元数据中。

参数

类型

默认值

是否必填

说明

bucket.num

Integer

Fluss 表的 Bucket 数量。影响数据并行读写的并发度。默认值采用集群的default.bucket.number

bucket.key

String

用于数据哈希分布的字段,必须是主键的子集,多个字段用逗号分隔。

table.log.ttl

Duration

7d

日志数据的保留时间。

table.replication.factor

Integer

集群默认值

日志表副本数,不能超过 tabletServer 数量。

table.log.format

Enum

ARROW

日志存储格式,可选 ARROW(列式,支持查询下推)或 INDEXED

table.log.arrow.compression.type

Enum

ZSTD

ARROW 格式的压缩类型,可选 NONELZ4_FRAMEZSTD

table.log.arrow.compression.zstd.level

Integer

3

ZSTD 压缩级别(1-22)。

table.kv.format

Enum

COMPACTED

KV 存储格式,可选 COMPACTEDINDEXED

table.merge-engine

Enum

last_row

主键表的合并策略,可选 first_rowversionedlast_row

table.merge-engine.versioned.ver-column

String

条件必填

merge-engineversioned 时必须指定版本列名称。

table.datalake.enabled

Boolean

false

是否开启湖存储分层。

table.datalake.format

Enum

集群默认值

数据湖格式,当前仅支持 paimon

table.log.tiered.local-segments

Integer

2

分层存储开启后,本地保留的日志段数量。

自动分区参数

参数

类型

默认值

是否必填

说明

table.auto-partition.enabled

Boolean

false

是否启用自动分区创建。

table.auto-partition.time-unit

Enum

DAY

自动分区的时间粒度,可选 YEARQUARTERMONTHDAYHOUR

table.auto-partition.num-precreate

Integer

2

检查时预创建的未来分区数量。

table.auto-partition.num-retention

Integer

7

保留的历史分区数量,超出的分区将被自动删除。

table.auto-partition.time-zone

String

系统时区

自动分区使用的时区。

读取参数(源表)

以下参数可通过 WITH 子句或 SQL Hint 配置。

参数

类型

默认值

是否必填

说明

scan.startup.mode

Enum

full

消费起始模式,可选 fullearliestlatesttimestamp

scan.startup.timestamp

Long/String

条件必填

起始时间戳(毫秒值或 yyyy-MM-dd HH:mm:ss 格式),仅在 scan.startup.modetimestamp 时生效。

scan.partition.discovery.interval

Duration

1分钟

自动发现新分区的间隔时间。设置为负值可禁用自动发现。

client.scanner.log.check-crc

Boolean

true

是否对消息进行 CRC32 校验以验证数据完整性。

client.scanner.log.max-poll-records

Integer

500

每次 poll() 调用返回的最大记录数。

client.scanner.log.fetch.max-bytes

MemorySize

16mb

每次请求从服务端拉取的最大字节数。

client.scanner.log.fetch.max-bytes-for-bucket

MemorySize

1mb

每个 Bucket 每次请求拉取的最大字节数。

client.scanner.log.fetch.min-bytes

MemorySize

1b

请求响应前期望的最小字节数。

client.scanner.log.fetch.wait-max-time

Duration

500ms

当未满足最小字节数时,服务端最大等待时间。

client.scanner.io.tmpdir

String

系统临时目录

用于存放快照和日志段等临时文件的本地目录。

client.scanner.remote-log.prefetch-num

Integer

4

远程日志段预拉取数量。

client.remote-file.download-thread-num

Integer

3

下载远程文件的线程数。

写入参数(结果表)

以下参数可通过 WITH 子句或 SQL Hint 配置。

参数

类型

默认值

是否必填

说明

sink.ignore-delete

Boolean

false

是否忽略 DELETEUPDATE_BEFORE 消息。

sink.bucket-shuffle

Boolean

true

写入前是否按 Bucket ID 进行数据 Shuffle,以提升写入效率。

client.writer.buffer.memory-size

MemorySize

64mb

内部缓存行数据的总内存大小。

client.writer.batch-size

MemorySize

2mb

同一 Bucket 内记录的目标批次大小。

client.writer.buffer.wait-timeout

Duration

Long.MAX

Writer 等待可用 Segment 的最大阻塞时间。

client.writer.batch-timeout

Duration

100ms

积攒记录后发送批次的最大等待时间。

client.writer.bucket.no-key-assigner

Enum

STICKY

无主键表的 Bucket 分配策略,可选 STICKYROUND_ROBIN

client.writer.acks

String

all

写入确认级别。all(或 -1):等待所有副本确认;1:Leader 确认即返回;0:不等待确认。推荐使用 all

client.writer.request-max-size

MemorySize

10mb

单个请求的最大字节数。

client.writer.retries

Integer

MAX_VALUE

因可重试错误导致发送失败时的重试次数。

client.writer.enable-idempotence

Boolean

true

是否开启幂等写入,保证 Exactly-Once 语义和顺序性。

client.writer.max-inflight-requests-per-bucket

Integer

5

每个 Bucket 的最大未确认请求数(幂等写入开启时生效)。

维表参数

参数

类型

默认值

是否必填

说明

lookup.async

Boolean

true

是否启用异步 Lookup,提升维表关联吞吐量。

lookup.cache

Enum

NONE

缓存策略,可选 NONE(不缓存)或 PARTIAL(部分缓存)。

lookup.max-retries

Integer

3

Lookup 失败时的最大重试次数。

lookup.partial-cache.expire-after-access

Duration

访问后的缓存过期时间。

lookup.partial-cache.expire-after-write

Duration

写入后的缓存过期时间。

lookup.partial-cache.cache-missing-key

Boolean

true

是否缓存未命中的 Key(缓存 null 值)。

lookup.partial-cache.max-rows

Long

缓存的最大行数。

client.lookup.queue-size

Integer

25600

待处理的 Lookup 操作队列大小。

client.lookup.max-batch-size

Integer

128

合并多个 Lookup 操作的最大批次大小。

client.lookup.max-inflight-requests

Integer

128

最大未确认 Lookup 请求数。

client.lookup.batch-timeout

Duration

100ms

填满 Lookup 批次前的最大等待时间。

数据摄入

Fluss连接器作为数据下游可以在数据摄入YAML作业中使用。

语法结构

source:
   type: xxx

sink:
  type: fluss
  name: Fluss Sink
  bootstrap.servers: localhost:9123
  properties.client.security.protocol: sasl
  properties.client.security.sasl.mechanism: PLAIN
  properties.client.security.sasl.username: developer
  properties.client.security.sasl.password: developer-pass
  

pipeline:
  schema.change.behavior: IGNORE

表结构变更

Fluss Server支持追加新增列,不支持其他列类型操作(比如删除列、重命名列和变更列类型)。因此,建议搭配Flink CDC默认的schema.change.behavior = LENIENT使用:

  • 列重命名时,改为发送更改列类型和新增列两个事件。原有的列不删除,更改列类型为nullable,同时新增一个列名为新名称、数据类型改为nullable的列。

  • 删除列时,改为发送更改列类型事件,将对应字段类型变为nullable。

  • 新增列时仍发送新增列事件,但字段类型会变为nullable。

配置项

参数

说明

是否必填

数据类型

默认值

备注

type

sink类型。

STRING

固定值为fluss。

name

sink名称。

STRING

无。

bootstrap.servers

Fluss Server地址

STRING

格式为host:port,host:port,host:port,以英文逗号(,)分隔。

bucket.key

分桶键

String

指定每个 Fluss 表的数据分布策略。表之间用';'分隔,分桶键之间用','分隔。

格式:database1.table1:key1,key2;database1.table2:key3

数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。 若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。

bucket.keybucket.num值在创建表后就不再改变。

bucket.num

配置 fluss 表的桶数量

String

每个 Fluss 表的桶数量。表之间用';'分隔。

格式:database1.table1:4;database1.table2:8

如果一张表未配置桶数量,默认会采用Server端的配置项default.bucket.number

bucket.keybucket.num值在创建表后就不再改变。

properties.table.*

fluss 表属性

String

将 Fluss table 支持的参数传递给 pipeline,详情请参见Fluss table options

properties.client.*

fluss client参数

String

将 Fluss client 支持的参数传递给 pipeline,详情请参见Fluss client options

类型映射

数据摄入类型映射如下表所示。

CDC字段类型

Fluss 字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

TIMESTAMP_LTZ

TIMESTAMP_LTZ

CHAR(n)

CHAR(n)

VARCHAR(n)

STRING

ARRAY

ARRAY

MAP

MAP

ROW

ROW

BINARY(n)

暂不支持

VARBINARY(N)