Delta Join(多流Join新范式)

本文将为您介绍Delta Join的用法与实现。

Delta Join:基于Fluss的双流Join新方案

在实时数仓场景下,往往需要依赖多张实时数据表来构建统一的宽表。基于开源Flink + Kafka搭建的实时数仓,只能使用多个KafkaTopic通过Flink Join拼接形成一张大宽表,来实现无论哪个Topic发生更新时,总能对整个宽表完成更新。

由于Kafka本身并非面向分析场景设计,因此只能依托开源Flink的流式Join来实现这个场景,因为需要双边驱动更新,并在缓存全量上游数据,会导致开源Flink状态体积庞大,带来资源成本高、运维复杂、效率低等问题。

Fluss对比Kafka实现了Delta Join的能力。这是一种新型的 Join 方案,能够在保持双流 Join 语义的同时,将数据更新行为下沉到Fluss表中完成,从而显著降低Flink的资源消耗,提升作业的稳定性和执行效率。

Delta Join的优势

  • 无 Join State:省去冗余数据存储。

  • 低成本:仅依赖 Fluss 主键表和二级索引。

  • 更稳定高效 :避免大状态带来的性能瓶颈。

image

Delta Join使用限制

  • 左右表必须为 Fluss 的主键表(支持分区表)。

  • Fluss 主键表的分桶键(Bucket Key)需要为主键(Primary Key)前缀。

  • Delta Join 的 Join Key 必须与 Fluss 主键表定义的分桶键(Bucket Key)完全相同。

    说明
    • 例如定义:PRIMARY KEY (user_id, order_id, order_data);

      合法前缀为包括:user_iduser_id, order_iduser_id, order_id, order_data

      从主键最左边开始连续的字段组合为合法的前缀组合。

    • 例如查询:JOIN users u ON o.user_id = u.user_id;

      此时Join Keyuser_id,则需设置'bucket.key' = 'user_id',与定义的分桶键相同。

使用示例

创建表

CREATE TABLE `fluss-catalog`.`my_db`.`orders` (
  user_id BIGINT,
  order_date DATE,
  order_id BIGINT,
  amount DECIMAL(10,2),
  PRIMARY KEY (user_id, order_id, order_date) NOT ENFORCED
) WITH (
  'bucket.key' = 'user_id, order_id' 
);

CREATE TABLE `fluss-catalog`.`my_db`.`order_enhance` (
  user_id BIGINT,
  order_id BIGINT,
  risk_level TINYINT,
  is_fraud BOOLEAN,
  update_time TIMESTAMP(3),
  PRIMARY KEY (user_id, order_id) NOT ENFORCED
) WITH (
  'bucket.key' = 'user_id, order_id'
);

SQL作业

对于 Delta Join,其作业写法和 Inner Join 一致, 对于符合前提要求的SQL作业会自动生成使用Delta Join大的查询计划。

-- join key 为两张表对应的 bucket key
SELECT * FROM `fluss-catalog`.`my_db`.`orders` o 
INNER JOIN `fluss-catalog`.`my_db`.`order_enhance` e 
ON o.user_id = e.user_id AND o.order_id = e.order_id;

还可以通过 hint options 调整 fluss 的部分参数,提高 prefixLookup 的性能。

参数

类型

默认值

说明

client.lookup.queue-size

Integer

25600

Pending状态的Lookup 操作数量上限

client.lookup.max-batch-size

Integer

128

单个Lookup请求中合并攒批的Lookup操作数上限

client.lookup.max-inflight-requests

Integer

128

未响应确认的Lookup请求数上限

client.lookup.batch-timeout

Duration

100ms

Lookup操作攒批的超时时间上限,当攒批数量满或触发超时时间,将结束攒批并发送Lookup请求。

使用示例:例如把client.lookup.queue-size设置为 2560。

-- join key 为两张表对应的 bucket key
SELECT * FROM `fluss-catalog`.`my_db`.`orders` o  /*+ OPTIONS('client.lookup.queue-size'='2560') */  
INNER JOIN `fluss-catalog`.`my_db`.`order_enhance` e /*+ OPTIONS('client.lookup.queue-size'='2560') */ 
ON o.user_id = e.user_id AND o.order_id = e.order_id;

作业上线

部署作业后,还需要添加额外的配置才能开启Delta Join。

  1. 作业运维页,单击相应作业。

  2. 部署详情 > 运行参数配置 > 其他配置中填写如下参数。

    # 以下为默认建议值  
    table.exec.delta-join.cache-enabled: 'true' 
    table.exec.async-lookup.buffer-capacity: '3000' 
    table.optimizer.delta-join.strategy: FORCE 
    table.exec.delta-join.left.cache-size: '100' 
    table.exec.delta-join.right.cache-size: '1000'

    参数名

    默认值

    说明

    table.optimizer.delta-join.strategy

    AUTO

    是否开启 Delta Join 优化。可以设置为 AUTO, FORCE 或者 NONE。

    推荐设置为 FORCE,在无法优化为Delta Join的场景显式报错,方便查看原因。

    table.exec.async-lookup.buffer-capacity

    100

    Delta Join 节点最多支持的异步请求数量。

    如果Fluss集群压力和Delta Join内存压力都不大,可以设置高一些。

    推荐设置到 “千” 及以上级别。

    table.exec.delta-join.cache-enabled

    false

    是否开启本地内存的缓存。开启后可以减少请求 Fluss 的请求数。

    在内存压力不大的情况下,推荐设置为true。

    table.exec.delta-join.left.cache-size

    10000

    左表可放入内存的缓存 key 数量,可减少对左Fluss表的请求数量。

    仅当table.exec.delta-join.cache-enabled 为 true 时生效。

    该参数会占用一部分内存,因此配置该参数时,需要结合每条数据大小、TM内存大小、右表可能的热点数据量来综合考虑。

    在内存压力不大的情况下,可以先用默认值试一下。GC 频繁时适量减小。Fluss集群点查压力大时,适量增大。

    table.exec.delta-join.right.cache-size

    10000

    右表可放入内存的缓存 key 数量,可减少对右Fluss表的请求数量。

    仅当table.exec.delta-join.cache-enabled 为 true 时生效。

    该参数会占用一部分内存,因此配置该参数时,需要结合每条数据大小、TM内存大小、左表可能的热点数据量来综合考虑。

    在内存压力不大的情况下,可以先用默认值试一下。GC 频繁时适量减小。Fluss集群点查压力大时,适量增大。

  3. 保存参数配置,运行作业后,在状态总览页,看到 Delta Join 节点,则表示 Inner Join 变更为 Delta Join 成功。

    image