对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。
维表 JOIN 概述
维表 JOIN(Lookup Join)通过对每条流数据在处理时间查询外部维表,将维表字段补全到主流中。常用于事件流补充字典、维度信息等场景。
维表 JOIN 的核心环节:
语法:使用
FOR SYSTEM_TIME AS OF PROCTIME()表示对每条流数据查询维表的当前数据,而非维表快照。Cache 策略:在连接器层缓存维表数据,降低外部访问压力。
Lookup 行为:通过 LOOKUP hint 配置同步/异步、缓冲容量、重试策略等。
Join 物理策略:通过 SHUFFLE_HASH 等 hint 控制 Shuffle 行为,优化数据倾斜。
Job 级配置:通过 SET 命令调整
table.exec.*系列全局参数。
维表 JOIN 语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;必须加上
FOR SYSTEM_TIME AS OF PROCTIME(),表示 JOIN 维表当前时刻所看到的每条数据。ON 条件中必须包含维表实际能支持随机查找的字段的等值条件。
ON 条件中支持对源表字段使用 CAST 等类型转换函数。如源表与维表字段类型不一致,可在源表字段上进行类型转换以匹配维表字段类型。
使用限制与注意事项
维表 JOIN 仅支持对当前时刻维表快照的关联。
维表支持 INNER JOIN 和 LEFT JOIN,不支持 RIGHT JOIN 或 FULL JOIN。
如有一对一 JOIN 需求,请确保连接条件中包含了维表中具有唯一性字段的等值连接条件。
对每条流式数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。具体的维表的行为请参见对应连接器行为。
维表 Cache 策略
大部分连接器的维表 JOIN 支持 Cache 策略,不同连接器的支持情况略有差异,请查阅对应连接器文档确认。通用 Cache 策略如下:
策略 | 行为 |
None(默认) | 无缓存。 |
LRU | 缓存维表里的部分数据。源表的每条数据都会触发系统先在 Cache 中查找数据,未命中时再查物理维表。 |
ALL | 缓存维表里的所有数据。Job 运行前,系统会将维表中所有数据加载到 Cache 中,之后所有查找都通过 Cache 完成;未命中视为 KEY 不存在。可通过连接器参数配置周期性或定时重新加载维表数据。适用于维表数据量较小、对实时性要求不高且需要极致吞吐的场景。 |
需根据业务需求在实时性和性能之间权衡。如对实时性要求极高,可不使用 Cache,直接从维表读取。
使用 Cache 时,可配合 LRU 和 TTL 维持较新的缓存数据。TTL 可设置较短(例如几秒至几十秒),定期从源表加载数据。
使用 ALL 缓存策略时,注意节点内存大小,防止 OOM。
ALL 缓存策略下系统异步加载维表数据,需为维表 JOIN 节点增加内存,建议增加的内存大小为远程表数据量的两倍。
维表 JOIN 调优
调优参数分类与传递方式
维表 JOIN 调优涉及三类参数,每类参数的传递方式不同,用错方式 hint 会被静默忽略。
参数类别 | 典型参数 | 传递方式 | 作用域 |
连接器 WITH 选项 |
| 建表语句的 | 单张表 |
LOOKUP hint 选项 |
|
| 单个 Join 操作 |
Flink Job 级 TableConfig | 以 | 作业开头使用 | 整个作业 |
OPTIONS()hint 仅用于覆盖建表语句的 WITH 选项,不接受table.exec.*类 TableConfig;写错会被静默忽略。LOOKUP()hint 只识别上表列出的固定 key,不接受 Flink 配置项全名;例如调整 async 缓冲,必须写'capacity'='100',写'table.exec.async-lookup.buffer-capacity'='100'不生效。如需调整 hint 范围外的全局参数,请使用
SET 'xxx' = 'yyy';。
通过 OPTIONS hint 覆盖连接器选项
OPTIONS() hint 用于在 SQL 中临时覆盖某张表的 WITH 选项,无需修改建表语句。常用于调整 Cache 行为。
SELECT t.id, t.name, w.phoneNumber
FROM kafka_input AS t
LEFT JOIN phoneNumber /*+ OPTIONS(
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '1000'
) */ FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;OPTIONS 中传入的 key 必须为连接器本身支持的 WITH 选项。
上例中的 lookup.cache、lookup.partial-cache.max-rows 是基于 Flink 通用 LookupCache 接口(FLIP-221)的连接器选项,仅适用于实现了该接口的新版连接器(如 Fluss、JDBC 等)。前文「维表 Cache 策略」中描述的 None/LRU/ALL 策略对应历史接口的连接器,两者通过不同的 WITH 选项配置。请查阅对应连接器文档确认其支持的 Cache 选项与策略,避免使用不匹配的参数导致配置不生效。
通过 LOOKUP hint 配置 Lookup 行为
LOOKUP hint 功能与社区保持一致,用于在单个 Join 操作上配置维表的同步、异步、重试及 shuffle 策略。详情参见 Apache Flink Lookup Hint。
仅 VVR 8.0 及以上版本支持 LOOKUP hint。
仅 VVR 8.0.8 及以上版本支持通过
'shuffle' = 'true'配置 shuffle 策略。VVR 8.0 以上支持使用别名;如果维表定义了别名,hint 中必须使用别名。
支持的选项
选项 | 含义 | 取值 |
| 指定 hint 作用的维表名或别名 | 表名/别名字符串 |
| 是否启用异步 lookup |
|
| 异步 lookup 的输出顺序 |
|
| 异步 lookup 缓冲队列容量 | 整数 |
| 异步 lookup 超时时间 | 时长(如 |
| 触发重试的条件 | 当前仅支持 |
| 重试策略 | 当前仅支持 |
| 固定重试间隔 | 时长(如 |
| 最大重试次数 | 整数 |
| 是否在维表 Join 前做 shuffle |
|
shuffle 选项行为
shuffle 选项影响维表 Join 的 shuffle 策略,不同场景表现如下。
场景 | 联接策略 |
不配置 | 使用引擎默认的 shuffle 策略。 |
不配置 | 使用引擎默认的 shuffle 策略。 |
配置 | 默认使用 SHUFFLE_HASH 策略,含义请参见《SHUFFLE_HASH》。 |
配置 | 使用表连接器的自定义 shuffle 策略。 |
目前仅流式数据湖仓Paimon会提供自定义shuffle策略,具体会在Join字段包含全部分桶字段的情况下基于bucket进行shuffle。
代码示例
-- 只对维表 dim1 配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;
-- 同时对维表 dim1、dim2 配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'), LOOKUP('table'='dim2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;
-- 维表 dim1 使用别名 D1 时,hint 中必须使用别名
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;
-- 同时对维表 dim1、dim2 通过别名配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'), LOOKUP('table'='D2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;
通过 SET 配置 Job 级 TableConfig
table.exec.* 等 TableConfig 属于 Job 级全局配置,必须通过 SET 命令传递。LOOKUP hint 和 OPTIONS hint 均不接受这类参数。
常用维表 JOIN 相关 TableConfig:
配置项 | 含义 | 默认值 |
| 异步 lookup 的缓冲队列容量(Job 级默认值) | 100 |
| 异步 lookup 超时时间(Job 级默认值) | 3min |
| 异步 lookup 的输出顺序(Job 级默认值) |
|
-- 在作业开头设置 Job 级默认值
SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';
INSERT INTO sink_table
SELECT ...
FROM src_table
LEFT JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON src_table.key = d.key;当 SET 与 LOOKUP hint 同时设置同名参数时,单个 Join 操作上的 LOOKUP hint 优先级更高。
通过 Join 策略 Hints 控制 Shuffle
维表 Join 策略 Hints 用于控制 Shuffle 行为,包括 SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH 和 SKEW。维表 Cache 策略和联接策略的适用场景如下。
Cache 策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH(和 SKEW 等价) |
None | 不建议使用该联接策略提示,主流会引入额外的网络开销。 | 不建议使用该联接策略提示,主流会引入额外的网络开销。 |
LRU | 在维表查找IO成为瓶颈时,建议考虑使用该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表查找IO成为瓶颈且主流数据在Join Key上有倾斜时,建议考虑该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 |
ALL | 在维表内存使用量成为瓶颈时,建议使用该联接策略提示。内存使用率可降低为1/并发度。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表内存使用量成为瓶颈且主流数据在Join Key上有倾斜时,建议使用该联接策略提示。内存使用率降低为分桶数/并发度。 |
当前 LOOKUP hint 的
shuffle选项已能覆盖 SHUFFLE_HASH hint 功能,两者同时使用时,会优先采纳 LOOKUP hint 的shuffle选项。当前 LOOKUP hint 的
shuffle选项还未支持解决数据倾斜的功能,当和 REPLICATED_SHUFFLE_HASH、SKEW 同时使用时,会优先采纳 REPLICATED_SHUFFLE_HASH、SKEW 对应的 shuffle 策略。
SHUFFLE_HASH
使用效果:在维表 Join 中使用 Shuffle Hash 策略,可将主流数据在 Join 之前根据 Join Key 做一次 shuffle。在使用 LRU Cache 策略时可提高 Cache 命中率、减少 IO 请求数;在使用 ALL Cache 策略时可减少内存使用量。每个 SHUFFLE_HASH 联接提示可指定多张维表。
使用限制:SHUFFLE_HASH 可减少内存开销,但上游数据需按 Join Key 做一次 shuffle,引入额外网络开销,因此以下两种场景不适合使用:
主流数据在 Join Key 上存在严重的数据倾斜。这种场景下使用 SHUFFLE_HASH,会因数据倾斜导致 Join 节点成为性能瓶颈,造成流作业严重反压或批场景严重长尾,此时建议使用 REPLICATED_SHUFFLE_HASH。
维表数据较小,ALL Cache 策略加载无内存瓶颈时。这种场景下使用 SHUFFLE_HASH,节约的内存开销和额外引入的网络开销相比并不划算。
代码示例:
-- 只对维表 dim1 开启 SHUFFLE_HASH 联接
SELECT /*+ SHUFFLE_HASH(dim1) */ ...
-- 同时对维表 dim1、dim2 均开启 SHUFFLE_HASH 联接
SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ ...
-- 维表 dim1 使用别名 D1 时,hint 中必须使用别名
SELECT /*+ SHUFFLE_HASH(D1) */ ...REPLICATED_SHUFFLE_HASH
使用效果:在维表 Join 中使用 Replicated Shuffle Hash 策略,效果基本与 SHUFFLE_HASH 一致,区别在于会将主流具有相同 key 的数据随机打散到指定的 N 个并发上,可解决数据倾斜导致的性能瓶颈。每个 REPLICATED_SHUFFLE_HASH 联接提示中可指定多张维表。
使用限制:
需配置倾斜数据分桶数量参数
table.exec.skew-join.replicate-num,默认值 16,取值不能大于维表联接节点的并发。配置方法请参见《作业级别 SQL 调优》。当前不支持更新流,当主流是更新流时使用会报错。
代码示例:
SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ ...SKEW
使用效果:当指定表存在数据倾斜时,优化器会在维表 Join 中使用 Replicated Shuffle Hash 策略(SKEW 是语法糖,底层用 Replicated Shuffle Hash 实现)。
使用限制:
每个 SKEW 提示只能指定 1 张表。
表名需为存在数据倾斜的主表名称,而非维表名称。
当前不支持更新流,当主流是更新流时使用会报错。
代码示例:
SELECT /*+ SKEW(src) */ ...使用示例
示例一:基础维表 JOIN
最基础的写法,使用 MySQL 维表补全 Kafka 流的字段。无任何调优 hint。
CREATE TEMPORARY TABLE kafka_input (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE phoneNumber (
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
CREATE TEMPORARY TABLE result_infor (
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM kafka_input AS t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;示例二:通过 OPTIONS hint 启用 Cache 并通过 LOOKUP hint 配置异步查询
调优场景:维表 QPS 不足,需要在维表上开启分区 Cache,同时启用异步 lookup 并增大缓冲队列。
INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true', 'capacity' = '3072') */
t1.member_id AS member_id,
t2.tag AS tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info /*+ OPTIONS(
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '1000'
) */
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;连接器选项(
lookup.cache、lookup.partial-cache.max-rows)通过OPTIONS()hint 写在维表后,覆盖建表时的 WITH 选项。Lookup 行为(
async、capacity)通过LOOKUP()hint 写在 SELECT 之后。OPTIONS 与 LOOKUP 是两类不同的 hint,必须分别书写,混写会导致 hint 静默失效。
示例三:通过 SET 调整 Job 级异步 lookup 缓冲
调优场景:作业内多个维表 JOIN 都需要更大的异步 lookup 缓冲,希望全局生效,并配合 LOOKUP hint 启用 async。
SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';
INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true') */
t1.member_id,
t2.tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;table.exec.async-lookup.buffer-capacity等参数为 Job 级 TableConfig,仅可通过 SET 设置,写入 OPTIONS 或 LOOKUP hint 均不生效。若 LOOKUP hint 中未指定
capacity,则使用 SET 设置的全局值;若同时指定,则单个 JOIN 上的 hint 优先生效。