本文将为您介绍Delta Join的用法与实现。
Delta Join:基于Fluss的双流Join新方案
在实时数仓场景下,往往需要依赖多张实时数据表来构建统一的宽表。基于开源Flink + Kafka搭建的实时数仓,只能使用多个Kafka的Topic通过Flink Join拼接形成一张大宽表,来实现无论哪个Topic发生更新时,总能对整个宽表完成更新。
由于Kafka本身并非面向分析场景设计,因此只能依托开源Flink的流式Join来实现这个场景,因为需要双边驱动更新,并在缓存全量上游数据,会导致开源Flink状态体积庞大,带来资源成本高、运维复杂、效率低等问题。
Fluss对比Kafka实现了Delta Join的能力。这是一种新型的 Join 方案,能够在保持双流 Join 语义的同时,将数据更新行为下沉到Fluss表中完成,从而显著降低Flink的资源消耗,提升作业的稳定性和执行效率。
Delta Join的优势
无 Join State:省去冗余数据存储。
低成本:仅依赖 Fluss 主键表和二级索引。
更稳定高效 :避免大状态带来的性能瓶颈。
Delta Join使用限制
左右表必须为 Fluss 的主键表(支持分区表)。
Fluss 主键表的分桶键(Bucket Key)需要为主键(Primary Key)前缀。
Delta Join 的 Join Key 必须与 Fluss 主键表定义的分桶键(Bucket Key)完全相同。
说明例如定义:PRIMARY KEY (user_id, order_id, order_data);
合法前缀为包括:
user_id
;user_id, order_id
;user_id, order_id, order_data
。从主键最左边开始连续的字段组合为合法的前缀组合。
例如查询:JOIN users u ON o.user_id = u.user_id;
此时Join Key为user_id,则需设置
'bucket.key' = 'user_id'
,与定义的分桶键相同。
使用示例
创建表
CREATE TABLE `fluss-catalog`.`my_db`.`orders` (
user_id BIGINT,
order_date DATE,
order_id BIGINT,
amount DECIMAL(10,2),
PRIMARY KEY (user_id, order_id, order_date) NOT ENFORCED
) WITH (
'bucket.key' = 'user_id, order_id'
);
CREATE TABLE `fluss-catalog`.`my_db`.`order_enhance` (
user_id BIGINT,
order_id BIGINT,
risk_level TINYINT,
is_fraud BOOLEAN,
update_time TIMESTAMP(3),
PRIMARY KEY (user_id, order_id) NOT ENFORCED
) WITH (
'bucket.key' = 'user_id, order_id'
);
SQL作业
对于 Delta Join,其作业写法和 Inner Join 一致, 对于符合前提要求的SQL作业会自动生成使用Delta Join大的查询计划。
-- join key 为两张表对应的 bucket key
SELECT * FROM `fluss-catalog`.`my_db`.`orders` o
INNER JOIN `fluss-catalog`.`my_db`.`order_enhance` e
ON o.user_id = e.user_id AND o.order_id = e.order_id;
还可以通过 hint options 调整 fluss 的部分参数,提高 prefixLookup 的性能。
参数 | 类型 | 默认值 | 说明 |
client.lookup.queue-size | Integer | 25600 | Pending状态的Lookup 操作数量上限 |
client.lookup.max-batch-size | Integer | 128 | 单个Lookup请求中合并攒批的Lookup操作数上限 |
client.lookup.max-inflight-requests | Integer | 128 | 未响应确认的Lookup请求数上限 |
client.lookup.batch-timeout | Duration | 100ms | Lookup操作攒批的超时时间上限,当攒批数量满或触发超时时间,将结束攒批并发送Lookup请求。 |
使用示例:例如把client.lookup.queue-size
设置为 2560。
-- join key 为两张表对应的 bucket key
SELECT * FROM `fluss-catalog`.`my_db`.`orders` o /*+ OPTIONS('client.lookup.queue-size'='2560') */
INNER JOIN `fluss-catalog`.`my_db`.`order_enhance` e /*+ OPTIONS('client.lookup.queue-size'='2560') */
ON o.user_id = e.user_id AND o.order_id = e.order_id;
作业上线
部署作业后,还需要添加额外的配置才能开启Delta Join。
在作业运维页,单击相应作业。
在
中填写如下参数。# 以下为默认建议值 table.exec.delta-join.cache-enabled: 'true' table.exec.async-lookup.buffer-capacity: '3000' table.optimizer.delta-join.strategy: FORCE table.exec.delta-join.left.cache-size: '100' table.exec.delta-join.right.cache-size: '1000'
参数名
默认值
说明
table.optimizer.delta-join.strategy
AUTO
是否开启 Delta Join 优化。可以设置为 AUTO, FORCE 或者 NONE。
推荐设置为 FORCE,在无法优化为Delta Join的场景显式报错,方便查看原因。
table.exec.async-lookup.buffer-capacity
100
Delta Join 节点最多支持的异步请求数量。
如果Fluss集群压力和Delta Join内存压力都不大,可以设置高一些。
推荐设置到 “千” 及以上级别。
table.exec.delta-join.cache-enabled
false
是否开启本地内存的缓存。开启后可以减少请求 Fluss 的请求数。
在内存压力不大的情况下,推荐设置为true。
table.exec.delta-join.left.cache-size
10000
左表可放入内存的缓存 key 数量,可减少对左Fluss表的请求数量。
仅当table.exec.delta-join.cache-enabled 为 true 时生效。
该参数会占用一部分内存,因此配置该参数时,需要结合每条数据大小、TM内存大小、右表可能的热点数据量来综合考虑。
在内存压力不大的情况下,可以先用默认值试一下。GC 频繁时适量减小。Fluss集群点查压力大时,适量增大。
table.exec.delta-join.right.cache-size
10000
右表可放入内存的缓存 key 数量,可减少对右Fluss表的请求数量。
仅当table.exec.delta-join.cache-enabled 为 true 时生效。
该参数会占用一部分内存,因此配置该参数时,需要结合每条数据大小、TM内存大小、左表可能的热点数据量来综合考虑。
在内存压力不大的情况下,可以先用默认值试一下。GC 频繁时适量减小。Fluss集群点查压力大时,适量增大。
保存参数配置,运行作业后,在状态总览页,看到 Delta Join 节点,则表示 Inner Join 变更为 Delta Join 成功。