对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。
背景信息
大部分连接器的维表Join都可以使用Cache策略,不同连接器对Cache策略的支持情况稍有不同,请查看对应的连接器文档确定具体的支持情况。通用的Cache策略详情如下:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。
您需要根据具体业务需求,在平衡实时性和性能之间进行权衡。如果对数据实时性要求非常高,需要实时更新,可以不使用Cache,直接从维表读取。
如果使用Cache策略,可以配合LRU和TTL来实现较新的缓存数据。TTL可以设置的较短,例如几秒至几十秒,定期从源表加载数据。
使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。
因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
使用限制
维表JOIN仅支持对当前时刻维表快照的关联。
维表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN。
注意事项
如果您有一对一JOIN需求,请确保连接条件中包含了维表中具有唯一性字段的等值连接条件。
对每条流式数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。具体的维表的行为请参见对应连接器行为。
维表JOIN语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据。
ON条件中必须包含维表实际能支持随机查找的字段的等值条件。
ON条件中维表字段不能使用CAST等类型转换函数。如果您有类型转换需求,请在源表字段进行操作。
维表JOIN Hints
您可以通过使用维表Hints(Hint功能参见Flink SQL Hints)对维表Join的策略进行配置。维表Hints包含Lookup Hint与其他Join Hints。
仅VVR 8.0及以上版本支持Lookup Hint。
仅VVR 8.0.8及以上版本支持通过Lookup Hint配置是否开启shuffle策略。
VVR 8.0以上支持使用别名,如果维表定义了别名,Hint中必须使用别名。
仅VVR 4.0及以上版本支持其他Join Hints。
Lookup Hint
Lookup Hint功能和社区保持一致,可以用于配置维表的同步、异步、重试查找策略,详情参见Lookup Hint。VVR 8.0.8及以上版本对Lookup Hint的功能进行了扩展,支持配置通过'shuffle' = 'true'
选项配置维表联接时的shuffle策略,不同场景的shuffle策略如下表所示。
场景 | 联接策略 |
不配置'shuffle' = 'true'选项 | 使用引擎默认的shuffle策略。 |
不配置'shuffle' = 'true'选项,且维表连接器不提供自定义联接策略 | |
配置'shuffle' = 'true' 选项,且维表连接器不提供自定义联接策略 | 默认使用SHUFFLE_HASH策略,含义请参见SHUFFLE_HASH。 |
配置'shuffle' = 'true' 选项,且维表连接器提供自定义联接策略 | 使用表连接器的自定义shuffle策略。 |
目前仅流式数据湖仓Paimon会提供自定义shuffle策略,具体会在Join字段包含全部分桶字段的情况下基于bucket进行shuffle。
对维表配置联接时的shuffle策略代码示例如下。
-- 只对维表dim1配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 同时对维表dim1, dim2配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 对维表dim1必须使用别名D1配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- 同时对维表dim1, dim2通过别名配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
其他Join Hints
维表Join Hints仅用于配置维表联接策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。维表Cache策略和联接策略之间的适用场景详情如下表所示。
Cache策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (和SKEW等价) |
None | 不建议使用该联接策略提示,主流会引入额外的网络开销。 | 不建议使用该联接策略提示,主流会引入额外的网络开销。 |
LRU | 在维表查找IO成为瓶颈时,建议考虑使用该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表查找IO成为瓶颈且主流数据在Join Key上有倾斜时,建议考虑该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 |
ALL | 在维表内存使用量成为瓶颈时,建议使用该联接策略提示。内存使用率可降低为1/并发度。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表内存使用量成为瓶颈且主流数据在Join Key上有倾斜时,建议使用该联接策略提示。内存使用率降低为分桶数/并发度。 |
SHUFFLE_HASH
使用效果
在维表Join中使用Shuffle Hash策略,可以将主流数据在Join之前根据Join Key做一次shuffle。在使用LRU Cache策略时可以提高Cache命中率,减少IO请求数;在使用ALL Cache策略时可以减少内存使用量。每个SHUFFLE_HASH联接提示可指定多张维表。
使用限制
虽然SHUFFLE_HASH可以减少内存开销,但是由于上游数据需要按照Join Key做一次shuffle,引入额外的网络开销,因此下面两种场景不适合使用SHUFFLE_HASH联接策略。
主流数据在Join Key上存在严重的数据倾斜,这种场景下如果使用SHUFFLE_HASH联接,会因为数据倾斜导致Join节点成为性能瓶颈,从而会导致流作业出现严重反压或是批场景出现严重长尾,此时建议使用REPLICATED_SHUFFLE_HASH联接。
维表数据较小,ALL Cache策略加载没有内存瓶颈时,如果使用SHUFFLE_HASH联接,节约的内存开销和额外引入的网络开销相比,可能并不划算。
代码示例
-- 只对维表dim1开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 同时对维表dim1, dim2均开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 对维表dim1必须使用别名D1开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- 同时对维表dim1, dim2通过别名开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
使用效果
在维表Join中使用Replicated Shuffle Hash策略,其效果基本与SHUFFLE_HASH一致,但不同点是其会将主流具有相同key的数据随机打散到指定的N个并发上,可以解决数据倾斜导致的性能瓶颈。每个REPLICATED_SHUFFLE_HASH联接提示中可指定多张维表。
使用限制
需要配置倾斜数据分桶数量参数
table.exec.skew-join.replicate-num
,其默认值为16,取值不能大于维表联接节点的并发。配置方法请参见如何配置作业运行参数?。当前不支持更新流,当主流是更新流时,使用REPLICATED_SHUFFLE_HASH策略会报错。
代码示例
-- 对维表dim1开启REPLICATED_SHUFFLE_HASH联接 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- 对维表dim1通过别名开启REPLICATED_SHUFFLE_HASH联接 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
使用效果
当指定表存在数据倾斜时,优化器会在维表Join中使用Replicated Shuffle Hash策略(Skew只是一个语法糖,底层的实现是用的Replicated Shuffle Hash策略)。
使用限制
每个SKEW提示只能指定1张表。
表名需要为存在数据倾斜的主表名称,而不是维表名称。
当前不支持更新流,当主流是更新流时,使用SKEW策略会报错。
代码示例
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
当前LOOKUP Hint的shuffle选项已能覆盖 SHUFFLE_HASH hint功能,两者同时使用时,会优先采纳LOOKUP hint的shuffle选项。
当前LOOKUP Hint的shuffle选项还未支持解决数据倾斜的功能,当和REPLICATED_SHUFFLE_HASH、SKEW同时使用时,会优先采纳REPLICATED_SHUFFLE_HASH、SKEW对应的shuffle策略。
使用示例
测试数据
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
测试语句
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;
测试结果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai