维表JOIN语句

更新时间:
复制为 MD 格式

对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。

维表 JOIN 概述

维表 JOIN(Lookup Join)通过对每条流数据在处理时间查询外部维表,将维表字段补全到主流中。常用于事件流补充字典、维度信息等场景。

维表 JOIN 的核心环节:

  • 语法:使用 FOR SYSTEM_TIME AS OF PROCTIME() 表示对每条流数据查询维表的当前数据,而非维表快照。

  • Cache 策略:在连接器层缓存维表数据,降低外部访问压力。

  • Lookup 行为:通过 LOOKUP hint 配置同步/异步、缓冲容量、重试策略等。

  • Join 物理策略:通过 SHUFFLE_HASH 等 hint 控制 Shuffle 行为,优化数据倾斜。

  • Job 级配置:通过 SET 命令调整 table.exec.* 系列全局参数。

维表 JOIN 语法

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
说明
  • 必须加上 FOR SYSTEM_TIME AS OF PROCTIME(),表示 JOIN 维表当前时刻所看到的每条数据。

  • ON 条件中必须包含维表实际能支持随机查找的字段的等值条件。

  • ON 条件中支持对源表字段使用 CAST 等类型转换函数。如源表与维表字段类型不一致,可在源表字段上进行类型转换以匹配维表字段类型。

使用限制与注意事项

  • 维表 JOIN 仅支持对当前时刻维表快照的关联。

  • 维表支持 INNER JOIN 和 LEFT JOIN,不支持 RIGHT JOIN 或 FULL JOIN。

  • 如有一对一 JOIN 需求,请确保连接条件中包含了维表中具有唯一性字段的等值连接条件。

  • 对每条流式数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。具体的维表的行为请参见对应连接器行为。

维表 Cache 策略

大部分连接器的维表 JOIN 支持 Cache 策略,不同连接器的支持情况略有差异,请查阅对应连接器文档确认。通用 Cache 策略如下:

策略

行为

None(默认)

无缓存。

LRU

缓存维表里的部分数据。源表的每条数据都会触发系统先在 Cache 中查找数据,未命中时再查物理维表。

ALL

缓存维表里的所有数据。Job 运行前,系统会将维表中所有数据加载到 Cache 中,之后所有查找都通过 Cache 完成;未命中视为 KEY 不存在。可通过连接器参数配置周期性或定时重新加载维表数据。适用于维表数据量较小、对实时性要求不高且需要极致吞吐的场景。

说明
  • 需根据业务需求在实时性和性能之间权衡。如对实时性要求极高,可不使用 Cache,直接从维表读取。

  • 使用 Cache 时,可配合 LRU 和 TTL 维持较新的缓存数据。TTL 可设置较短(例如几秒至几十秒),定期从源表加载数据。

  • 使用 ALL 缓存策略时,注意节点内存大小,防止 OOM。

  • ALL 缓存策略下系统异步加载维表数据,需为维表 JOIN 节点增加内存,建议增加的内存大小为远程表数据量的两倍。

维表 JOIN 调优

调优参数分类与传递方式

维表 JOIN 调优涉及三类参数,每类参数的传递方式不同,用错方式 hint 会被静默忽略

参数类别

典型参数

传递方式

作用域

连接器 WITH 选项

lookup.cachelookup.partial-cache.max-rowslookup.partial-cache.expire-after-write

建表语句的 WITH (...);或在 SQL 中用 /*+ OPTIONS('key'='value') */ 覆盖

单张表

LOOKUP hint 选项

tableasynccapacitytimeoutoutput-moderetry-predicateretry-strategyfixed-delaymax-attemptsshuffle

/*+ LOOKUP('table'='dim', 'async'='true', 'capacity'='100') */

单个 Join 操作

Flink Job 级 TableConfig

table.exec.*table.optimizer.* 开头的全局配置

作业开头使用 SET 'table.exec.async-lookup.buffer-capacity' = '3072';

整个作业

重要
  • OPTIONS() hint 仅用于覆盖建表语句的 WITH 选项,不接受 table.exec.* 类 TableConfig;写错会被静默忽略。

  • LOOKUP() hint 只识别上表列出的固定 key,不接受 Flink 配置项全名;例如调整 async 缓冲,必须写 'capacity'='100',写 'table.exec.async-lookup.buffer-capacity'='100' 不生效。

  • 如需调整 hint 范围外的全局参数,请使用 SET 'xxx' = 'yyy';

通过 OPTIONS hint 覆盖连接器选项

OPTIONS() hint 用于在 SQL 中临时覆盖某张表的 WITH 选项,无需修改建表语句。常用于调整 Cache 行为。

SELECT t.id, t.name, w.phoneNumber
FROM kafka_input AS t
LEFT JOIN phoneNumber /*+ OPTIONS(
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '1000'
) */ FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;

OPTIONS 中传入的 key 必须为连接器本身支持的 WITH 选项。

说明

上例中的 lookup.cachelookup.partial-cache.max-rows 是基于 Flink 通用 LookupCache 接口(FLIP-221)的连接器选项,仅适用于实现了该接口的新版连接器(如 Fluss、JDBC 等)。前文「维表 Cache 策略」中描述的 None/LRU/ALL 策略对应历史接口的连接器,两者通过不同的 WITH 选项配置。请查阅对应连接器文档确认其支持的 Cache 选项与策略,避免使用不匹配的参数导致配置不生效。

通过 LOOKUP hint 配置 Lookup 行为

LOOKUP hint 功能与社区保持一致,用于在单个 Join 操作上配置维表的同步、异步、重试及 shuffle 策略。详情参见 Apache Flink Lookup Hint

  • 仅 VVR 8.0 及以上版本支持 LOOKUP hint。

  • 仅 VVR 8.0.8 及以上版本支持通过 'shuffle' = 'true' 配置 shuffle 策略。

  • VVR 8.0 以上支持使用别名;如果维表定义了别名,hint 中必须使用别名。

支持的选项

选项

含义

取值

table

指定 hint 作用的维表名或别名

表名/别名字符串

async

是否启用异步 lookup

true / false

output-mode

异步 lookup 的输出顺序

ordered / allow_unordered

capacity

异步 lookup 缓冲队列容量

整数

timeout

异步 lookup 超时时间

时长(如 180s

retry-predicate

触发重试的条件

当前仅支持 lookup_miss

retry-strategy

重试策略

当前仅支持 fixed_delay

fixed-delay

固定重试间隔

时长(如 10s

max-attempts

最大重试次数

整数

shuffle

是否在维表 Join 前做 shuffle

true / false

shuffle 选项行为

shuffle 选项影响维表 Join 的 shuffle 策略,不同场景表现如下。

场景

联接策略

不配置 'shuffle' = 'true' 选项

使用引擎默认的 shuffle 策略。

不配置 'shuffle' = 'true' 选项,且维表连接器不提供自定义联接策略

使用引擎默认的 shuffle 策略。

配置 'shuffle' = 'true' 选项,且维表连接器不提供自定义联接策略

默认使用 SHUFFLE_HASH 策略,含义请参见《SHUFFLE_HASH》。

配置 'shuffle' = 'true' 选项,且维表连接器提供自定义联接策略

使用表连接器的自定义 shuffle 策略。

说明

目前仅流式数据湖仓Paimon会提供自定义shuffle策略,具体会在Join字段包含全部分桶字段的情况下基于bucket进行shuffle。

代码示例
-- 只对维表 dim1 配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;

-- 同时对维表 dim1、dim2 配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'), LOOKUP('table'='dim2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b;

-- 维表 dim1 使用别名 D1 时,hint 中必须使用别名
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;

-- 同时对维表 dim1、dim2 通过别名配置维表联接 shuffle 策略
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'), LOOKUP('table'='D2', 'shuffle' = 'true') */ ...
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b;

通过 SET 配置 Job 级 TableConfig

table.exec.* 等 TableConfig 属于 Job 级全局配置,必须通过 SET 命令传递。LOOKUP hint 和 OPTIONS hint 均不接受这类参数。

常用维表 JOIN 相关 TableConfig:

配置项

含义

默认值

table.exec.async-lookup.buffer-capacity

异步 lookup 的缓冲队列容量(Job 级默认值)

100

table.exec.async-lookup.timeout

异步 lookup 超时时间(Job 级默认值)

3min

table.exec.async-lookup.output-mode

异步 lookup 的输出顺序(Job 级默认值)

ordered

-- 在作业开头设置 Job 级默认值
SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';

INSERT INTO sink_table
SELECT ...
FROM src_table
LEFT JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON src_table.key = d.key;
说明

当 SET 与 LOOKUP hint 同时设置同名参数时,单个 Join 操作上的 LOOKUP hint 优先级更高。

通过 Join 策略 Hints 控制 Shuffle

维表 Join 策略 Hints 用于控制 Shuffle 行为,包括 SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH 和 SKEW。维表 Cache 策略和联接策略的适用场景如下。

Cache 策略

SHUFFLE_HASH

REPLICATED_SHUFFLE_HASH(和 SKEW 等价)

None

不建议使用该联接策略提示,主流会引入额外的网络开销。

不建议使用该联接策略提示,主流会引入额外的网络开销。

LRU

在维表查找IO成为瓶颈时,建议考虑使用该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。

重要

主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。

在维表查找IO成为瓶颈且主流数据在Join Key上有倾斜时,建议考虑该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。

ALL

在维表内存使用量成为瓶颈时,建议使用该联接策略提示。内存使用率可降低为1/并发度。

重要

主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。

在维表内存使用量成为瓶颈且主流数据在Join Key上有倾斜时,建议使用该联接策略提示。内存使用率降低为分桶数/并发度。

重要
  • 当前 LOOKUP hint 的 shuffle 选项已能覆盖 SHUFFLE_HASH hint 功能,两者同时使用时,会优先采纳 LOOKUP hint 的 shuffle 选项。

  • 当前 LOOKUP hint 的 shuffle 选项还未支持解决数据倾斜的功能,当和 REPLICATED_SHUFFLE_HASH、SKEW 同时使用时,会优先采纳 REPLICATED_SHUFFLE_HASH、SKEW 对应的 shuffle 策略。

SHUFFLE_HASH

使用效果:在维表 Join 中使用 Shuffle Hash 策略,可将主流数据在 Join 之前根据 Join Key 做一次 shuffle。在使用 LRU Cache 策略时可提高 Cache 命中率、减少 IO 请求数;在使用 ALL Cache 策略时可减少内存使用量。每个 SHUFFLE_HASH 联接提示可指定多张维表。

使用限制:SHUFFLE_HASH 可减少内存开销,但上游数据需按 Join Key 做一次 shuffle,引入额外网络开销,因此以下两种场景不适合使用:

  • 主流数据在 Join Key 上存在严重的数据倾斜。这种场景下使用 SHUFFLE_HASH,会因数据倾斜导致 Join 节点成为性能瓶颈,造成流作业严重反压或批场景严重长尾,此时建议使用 REPLICATED_SHUFFLE_HASH。

  • 维表数据较小,ALL Cache 策略加载无内存瓶颈时。这种场景下使用 SHUFFLE_HASH,节约的内存开销和额外引入的网络开销相比并不划算。

代码示例

-- 只对维表 dim1 开启 SHUFFLE_HASH 联接
SELECT /*+ SHUFFLE_HASH(dim1) */ ...

-- 同时对维表 dim1、dim2 均开启 SHUFFLE_HASH 联接
SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ ...

-- 维表 dim1 使用别名 D1 时,hint 中必须使用别名
SELECT /*+ SHUFFLE_HASH(D1) */ ...

REPLICATED_SHUFFLE_HASH

使用效果:在维表 Join 中使用 Replicated Shuffle Hash 策略,效果基本与 SHUFFLE_HASH 一致,区别在于会将主流具有相同 key 的数据随机打散到指定的 N 个并发上,可解决数据倾斜导致的性能瓶颈。每个 REPLICATED_SHUFFLE_HASH 联接提示中可指定多张维表。

使用限制

  • 需配置倾斜数据分桶数量参数 table.exec.skew-join.replicate-num,默认值 16,取值不能大于维表联接节点的并发。配置方法请参见《作业级别 SQL 调优》。

  • 当前不支持更新流,当主流是更新流时使用会报错。

代码示例

SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ ...

SKEW

使用效果:当指定表存在数据倾斜时,优化器会在维表 Join 中使用 Replicated Shuffle Hash 策略(SKEW 是语法糖,底层用 Replicated Shuffle Hash 实现)。

使用限制

  • 每个 SKEW 提示只能指定 1 张表。

  • 表名需为存在数据倾斜的主表名称,而非维表名称。

  • 当前不支持更新流,当主流是更新流时使用会报错。

代码示例

SELECT /*+ SKEW(src) */  ...

使用示例

示例一:基础维表 JOIN

最基础的写法,使用 MySQL 维表补全 Kafka 流的字段。无任何调优 hint。

CREATE TEMPORARY TABLE kafka_input (
  id   BIGINT,
  name VARCHAR,
  age  BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE phoneNumber (
  name        VARCHAR,
  phoneNumber BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

CREATE TEMPORARY TABLE result_infor (
  id          BIGINT,
  phoneNumber BIGINT,
  name        VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM kafka_input AS t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON t.name = w.name;

示例二:通过 OPTIONS hint 启用 Cache 并通过 LOOKUP hint 配置异步查询

调优场景:维表 QPS 不足,需要在维表上开启分区 Cache,同时启用异步 lookup 并增大缓冲队列。

INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true', 'capacity' = '3072') */
  t1.member_id AS member_id,
  t2.tag AS tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info /*+ OPTIONS(
    'lookup.cache' = 'PARTIAL',
    'lookup.partial-cache.max-rows' = '1000'
  ) */
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;
  • 连接器选项lookup.cachelookup.partial-cache.max-rows)通过 OPTIONS() hint 写在维表后,覆盖建表时的 WITH 选项。

  • Lookup 行为asynccapacity)通过 LOOKUP() hint 写在 SELECT 之后。OPTIONS 与 LOOKUP 是两类不同的 hint,必须分别书写,混写会导致 hint 静默失效。

示例三:通过 SET 调整 Job 级异步 lookup 缓冲

调优场景:作业内多个维表 JOIN 都需要更大的异步 lookup 缓冲,希望全局生效,并配合 LOOKUP hint 启用 async。

SET 'table.exec.async-lookup.buffer-capacity' = '3072';
SET 'table.exec.async-lookup.timeout' = '180s';

INSERT INTO user_behavior_wide
SELECT /*+ LOOKUP('table' = 't2', 'async' = 'true') */
  t1.member_id,
  t2.tag
FROM user_behavior_datagen AS t1
LEFT JOIN fluss.fluss.user_active_info
FOR SYSTEM_TIME AS OF PROCTIME() AS t2
ON t1.member_id = t2.member_id;
  • table.exec.async-lookup.buffer-capacity 等参数为 Job 级 TableConfig,仅可通过 SET 设置,写入 OPTIONS 或 LOOKUP hint 均不生效。

  • 若 LOOKUP hint 中未指定 capacity,则使用 SET 设置的全局值;若同时指定,则单个 JOIN 上的 hint 优先生效。