在多表 Join 查询中,未参与 Join 匹配的数据会增加 I/O 开销并降低查询性能。Runtime Filter 在多表 Join 场景下自动生成轻量过滤器,在数据扫描阶段提前裁剪无效数据,减少 I/O 开销并提升查询性能。该功能自 Hologres V2.0 开始支持 Hash Join,V4.2 扩展至 Cross Join 场景。
背景信息
应用场景
Hologres 从 V2.0 版本开始支持 Runtime Filter,通常应用在多表(两表及以上)Join 的 Hash Join 场景,尤其是大表 Join 小表的场景中,无需手动设置,优化器和执行引擎会在查询时自动优化 Join 过程的过滤行为,从而降低 I/O 开销,提升 Join 的查询性能。
从 V4.2 版本开始,Runtime Filter 的能力进一步扩展到 Cross Join 场景,专门优化「标量子查询聚合结果作为大表过滤条件」这类高频 SQL 模式(详见Cross Join 支持 Runtime Filter(V4.2 新增))。
原理介绍
Hash Join 场景下的 Runtime Filter
两个表 Join 时,会通过一张表构建 Hash 表,然后匹配另一张表的数据。Join 过程涉及两个端:
-
build 端:构建 Hash 表的一侧,对应执行计划中的 Hash 节点。
-
probe 端:读取数据并与 build 端的 Hash 表进行匹配的一侧。
通常小表作为 build 端,大表作为 probe 端。
Runtime Filter 的原理是:利用 build 端的数据分布生成轻量过滤器,发送给 probe 端对数据进行裁剪,减少 probe 端参与 Hash Join 及网络传输的数据量,从而提升 Join 性能。 因此该功能更适用于大小表 Join 且表数据量相差较大的场景,性能将会比普通 Join 有更多提升。
Cross Join 场景下的 Runtime Filter(V4.2 新增)
V4.2 之前,Runtime Filter 仅覆盖 Hash Join。但用户经常使用标量子查询计算聚合值(如 min / max),并将结果作为大表的过滤条件,这类 SQL 在执行计划中产生的是 Cross Join:
-
build 端:标量子查询结果,恰好 1 行。
-
probe 端:大表扫描。
V4.2 引入了新的 Filter 类型 —— ScalarFilter,由优化器自动识别并将 build 端求值后的标量值下推给 probe 端 ScanNode,扫描阶段即完成行级与 RowGroup 级过滤,避免大表全表扫描后再逐行比较。
使用限制和触发条件
使用限制
-
仅 Hologres V2.0 及以上版本支持 Runtime Filter。
-
Hash Join 场景下,V2.0 版本仅支持 Join 条件中只有一个字段。从 V2.1 版本开始,支持多个字段的 Runtime Filter。
-
仅 V4.0 及以上版本支持 TopN Runtime Filter,用以提升单表 TopN 计算场景的性能。
-
仅 V4.2 及以上版本支持 Cross Join Runtime Filter(ScalarFilter)。
触发条件
Hash Join 场景
Runtime Filter 由引擎自动触发,需同时满足以下条件:
-
probe 端数据量在 100,000 行及以上。
-
扫描数据量比例:build 端 / probe 端 ≤ 0.1(比例越小,越容易触发)。
-
Join 输出数据量比例:build 端 / probe 端 ≤ 0.1(比例越小,越容易触发)。
Cross Join 场景(V4.2+)
由优化器自动生成 ScalarFilter,需满足:
-
Cross Join 的 build 端统计行数为 1,且分布为 Replicated。
-
probe 端的过滤谓词中包含 build 端表达式的比较条件(
>、<、>=、<=或BETWEEN)。
Runtime Filter 的类型
Runtime Filter 可从以下两个维度进行分类。
按 Shuffle 维度划分(适用于 Hash Join)
|
类型 |
支持版本 |
适用场景 |
|
Local |
V2.0+ |
probe 端数据不需要 Shuffle 时使用。build 端和 probe 端的 Join Key 为同一种分布方式,或 build 端数据 broadcast 给 probe 端,或 build 端数据按照 probe 端的分布方式 Shuffle 给 probe 端,均可使用 Local 类型。仅减少数据扫描量及参与 Hash Join 计算的数据量。 |
|
Global |
V2.2+ |
probe 端数据需要 Shuffle 时使用。Runtime Filter 在数据 Shuffle 之前做过滤,可减少数据的网络传输量。 |
类型无需手动指定,引擎会自适应选择。
按 Filter 类型划分
|
类型 |
支持版本 |
说明 |
|
Bloom Filter |
V2.0+ |
具有一定假阳性,可能少过滤部分数据,但应用范围广,在 build 端数据量较多时仍有较高的过滤效率。 |
|
In Filter |
V2.0+ |
build 端数据 NDV(非重复值个数)较小时使用。使用 build 端数据构建 HashSet 发送给 probe 端过滤,可过滤所有应过滤的数据,且能与 Bitmap 索引结合使用。 |
|
MinMax Filter |
V2.0+ |
根据 build 端数据的最大值和最小值发送给 probe 端做过滤。可根据元数据信息直接过滤掉文件或一个 Batch 的数据,减少 I/O 成本。 |
|
ScalarFilter |
V4.2+ |
专用于 Cross Join 场景。当 build 端恰好为 1 行时,引擎将标量值下推至 probe 端 ScanNode,在扫描阶段直接过滤。 |
Filter 类型无需手动指定,Hologres 会根据运行时 Join 情况自适应使用。
Cross Join 支持 Runtime Filter(V4.2 新增)
需求背景
用户经常使用标量子查询计算聚合值,并将结果作为大表的过滤条件。这类 SQL 在 V4.2 之前的执行计划中存在如下问题:
-
probe 端 ScanNode 无法利用 build 端的值提前过滤,必须全表扫描。
-
扫描完成后由 Cross Join 逐行比较,造成大量无效 I/O。
-
已有的 Runtime Filter 仅覆盖 Hash Join,不支持 Cross Join。
V4.2 引入 ScalarFilter,从根本上解决该问题,用户无需任何额外操作,功能默认开启。
典型 SQL 与执行计划对比
典型 SQL:
-- t1 大表,t2 聚合后 min/max 结果恰好 1 行
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);
优化前(无 Runtime Filter):
Cross Join
-> Seq Scan on t1 -- 全表扫描,无法提前过滤
-> Aggregate -- build 端 min/max,结果 1 行
-> Seq Scan on t2
probe 端必须扫描全量数据,再由 Cross Join 逐行比较,存在大量无效 I/O。
优化后(V4.2 ScalarFilter):
Cross Join
Runtime Filter Build Expr: (min(t2.a)), (max(t2.a))
-> Seq Scan on t1 -- 收到 ScalarFilter,扫描时直接过滤不满足条件的行和 RowGroup
Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2})
-> Aggregate
-> Seq Scan on t2
build 端求值后,将实际值下推给 probe 端 ScanNode,扫描阶段即完成过滤。
更多典型场景
-- 场景一:单表 + 标量子查询
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);
-- 场景二:多表 + 标量子查询
SELECT * FROM t1, t3
WHERE (SELECT min(a) FROM t2) <= t1.a
AND t3.a <= (SELECT max(a) FROM t2);
-- 场景三:CTE + Cross Join
WITH r AS (SELECT MIN(a) AS lo, MAX(a) AS hi FROM t2)
SELECT t1.* FROM t1, r
WHERE t1.a >= r.lo AND t1.a <= r.hi;
工作原理
-
优化器识别:当 Cross Join 的 build 端统计行数为 1 且分布为 Replicated 时,从谓词中提取比较表达式(
BETWEEN展开为>=+<=),生成 ScalarFilter 候选。 -
执行期下推:Cross Join 在 Open 阶段消费 build 端数据后,对
build_expr求值得到标量值,构建 ScalarFilter 发布给 probe 端 ScanNode。 -
ScanNode 提前过滤:probe 端 NiagaraScan 接收到 ScalarFilter 后,用实际值替换
target_expr中的占位符,同时生成两级过滤:-
行级过滤(conjunct eval)。
-
RowGroup 级过滤(存储引擎层直接跳过不满足条件的数据块)。
-
使用限制
-
build 端必须恰好 1 行。优化器仅在 build 端统计行数为 1 且为 Replicated 分布时生成 ScalarFilter。如果运行时 build 端非 1 行,会触发
RT_CHECK报错。 -
build 端值为 NULL 时,发布
FILTER_ALL(过滤所有行),同时清空 build 端数据短路 Cross Join,直接返回空结果。 -
不支持字典列。如果 probe 端
target_expr引用的列是字典编码列(DICTIONARY type),则不会应用 ScalarFilter。 -
支持的数据类型:
INT8/UINT8/INT16/UINT16/INT32/UINT32/INT64/UINT64/DATE32/TIMESTAMP/FLOAT/DOUBLE/STRING。不支持的类型会跳过 ScalarFilter(不报错,仅不下推)。 -
支持的比较操作符:
>、<、>=、<=。BETWEEN会被展开为两个范围比较。
GUC 参数
|
参数名 |
类型 |
默认值 |
级别 |
说明 |
|
|
bool |
|
|
是否为 Cross Join 生成 ScalarFilter 类型的 Runtime Filter。默认开启,用户无需额外操作。 |
关闭该功能:
SET hg_experimental_generate_runtime_scalar_filter = off;
EXPLAIN 输出新增字段
启用 ScalarFilter 后,Cross Join 节点会显示:
-
Runtime Filter Build Expr:build 端表达式,例如(min(t2.a))、(max(t2.a))。 -
Runtime Filter Target Expr:probe 端目标过滤表达式,例如(t1.a >= ${1}) AND (t1.a <= ${2}),其中${N}为运行时替换的filter_id对应的 build 端值占位符。
性能收益
在 TPC-DS 10TB 数据集中,存在如下典型 SQL 模式:
DELETE FROM inventory
WHERE inv_date_sk >= (SELECT min(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1')
AND inv_date_sk <= (SELECT max(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1');
经 V4.2 ScalarFilter 优化后,性能由 2.672 秒 提升至 0.552 秒,提升约 4.8 倍。
验证 Runtime Filter
以下示例帮助您验证 Runtime Filter 在不同场景下的效果。
示例 1:Join 条件中只有 1 列(Local 类型)
BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
执行计划。
QUERY PLAN
Gather (cost=0.00..10.20 rows=1000 width=16)
[40:1 id=100002 dop=1 time=9/9/9ms rows=1000(1000/1000/1000) mem=16/16/16KB open=2/2/2ms get_next=7/7/7ms]
-> Hash Join (cost=0.00..10.16 rows=1000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=8 dop=40 time=9/4/2ms rows=1000(39/25/17) mem=6/5/5KB open=4/1/0ms get_next=6/2/0ms]
-> Local Gather (cost=0.00..5.11 rows=1000000 width=8)
[id=3 dop=40 time=6/1/0ms rows=1000(39/25/17) mem=600/600/600B open=1/0/0ms get_next=6/1/0ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.10 rows=1000000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=11/7/7ms rows=1000(39/25/17) mem=41/41/41KB open=11/7/7ms get_next=0/0/0ms scan_rows=1000000 25270/25000/24697)]
-> Hash (cost=5.00..5.00 rows=1000 width=8)
[id=7 dop=40 time=3/1/0ms rows=1000(39/25/17) mem=396/396/396KB open=3/1/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=5 dop=40 time=1/0/0ms rows=1000(39/25/17) mem=0/0/0B open=1/0/0ms get_next=1/0/0ms local_dop=0/0/0]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=4 split_count=40 time=1/0/0ms rows=1000(39/25/17) mem=528/528/528B open=1/0/0ms get_next=1/0/0ms scan_rows=1000(39/25/17)]
-
test2表 1,000 行,test1表 100,000 行,build 端和 probe 端的数据量比例为 0.01(小于 0.1),满足 Runtime Filter 的默认触发条件。 -
probe 端的
test1表出现Runtime Filter Target Expr节点,表示 Runtime Filter 已下推。 -
probe 端
scan_rows为 100,000 行(从存储中读取的数据),rows为 1,000 行(过滤后的行数),可据此观察过滤效果。
示例 2:Join 条件中有多列(V2.1+,Local 类型)
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x AND test1.y = test2.y;
执行计划
QUERY PLAN
Gather (cost=0.00..10.46 rows=1000 width=16)
[40:1 id=100003 dop=1 time=6/6/6ms rows=1000(1000/1000/1000) mem=600/600/600B open=0/0/0ms get_next=6/6/6ms]
-> Hash Join (cost=0.00..10.43 rows=1000 width=16)
Hash Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
Runtime Filter Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
[id=8 dop=40 time=5/3/3ms rows=1000(1000/25/0) mem=40/2/1KB open=1/0/0ms get_next=4/3/3ms]
-> Local Gather (cost=0.00..5.11 rows=1000000 width=8)
[id=5 dop=40 time=4/3/3ms rows=1000(1000/25/0) mem=600/600/600B open=0/0/0ms get_next=4/3/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.10 rows=1000000 width=8)
Runtime Filter Target Expr: (test1.x AND test1.y)
[id=4 split_count=40 time=7/5/5ms rows=1000(1000/25/0) mem=49/11/9KB open=7/5/5ms get_next=1/0/0ms scan_rows=1000000(32768/25000/24576)]
-> Hash (cost=5.02..5.02 rows=40000 width=8)
[id=7 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=417/417/417KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Broadcast (cost=0.00..5.02 rows=40000 width=8)
[40:40 id=100002 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=0/0/0B open=1/0/0ms get_next=0/0/0ms * ]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=3 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=0/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=2 split_count=40 time=1/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=1/0/0ms scan_rows=1000(1000/1000/1000)]
-
Join 条件有多列,Runtime Filter 也生成了多列。
-
build 端数据采用 broadcast 方式,可使用 Local 类型的 Runtime Filter。
示例 3:Global 类型(V2.2+,支持 Shuffle Join)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
执行计划
QUERY PLAN
-> Hash Join (cost=0.00..10.08 rows=1000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=9 dop=40 time=10/8/8ms rows=1000(34/25/13) mem=6/6/5KB open=2/1/1ms get_next=8/7/7ms]
-> Redistribution (cost=0.00..5.07 rows=100000 width=8)
Hash Key: test1.x
[40:40 id=100002 dop=40 time=8/7/7ms rows=1289(46/32/20) mem=512/432/0B open=0/0/0ms get_next=8/7/7ms * ]
-> Local Gather (cost=0.00..5.01 rows=100000 width=8)
[id=3 dop=40 time=9/2/0ms rows=1289(1042/32/0) mem=600/600/600B open=0/0/0ms get_next=9/2/0ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.01 rows=100000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=11/3/0ms rows=1289(1042/32/0) mem=50512/11849/528B open=11/3/0ms get_next=1/0/0ms scan_rows=100000(8192/7692/1696)]
-> Hash (cost=5.00..5.00 rows=1000 width=8)
[id=8 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=396/396/396KB open=2/1/1ms get_next=0/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Redistribution (cost=0.00..5.00 rows=1000 width=8)
Hash Key: test2.x
[40:40 id=100003 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=0/0/0B open=0/0/0ms get_next=2/1/1ms * ]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=5 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=4 split_count=40 time=0/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=0/0/0ms scan_rows=1000(1000/1000/1000)]
probe 端数据被 Shuffle 到 Hash Join 算子,引擎自动使用 Global Runtime Filter 加速查询。
示例 4:In 类型结合 Bitmap 索引(V2.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x text, y text);
CALL set_table_property('test1', 'distribution_key', 'x');
CALL set_table_property('test1', 'bitmap_columns', 'x');
CALL set_table_property('test1', 'dictionary_encoding_columns', '');
CREATE TABLE test2 (x text, y text);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::text, t::text FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::text, t::text FROM generate_series(1, 50) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
执行计划。
QUERY PLAN
Gather (cost=0.00..11.70 rows=50 width=14)
[40:1 id=100002 dop=1 time=16/16/16ms rows=50(50/50/50) mem=2/2/2KB open=0/0ms get_next=16/16/16ms]
-> Hash Join (cost=0.00..11.70 rows=50 width=14)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=7 dop=40 time=15/9/3ms rows=50(3/1/0) mem=5132/3774/264B open=1/0/0ms get_next=14/8/3ms]
-> Local Gather (cost=0.00..6.26 rows=10000000 width=12)
[id=3 dop=40 time=14/8/3ms rows=50(3/1/0) mem=600/600/600B open=1/0/0ms get_next=14/8/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..6.06 rows=10000000 width=12)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=16/10/5ms rows=50(3/1/0) mem=67544/48945/528B open=16/9/5ms get_next=1/0/0ms scan_rows=7247692(250875/249920/248982) bitmap_used=50]
-> Hash (cost=5.00..5.00 rows=50 width=2)
[id=6 dop=40 time=1/0/0ms rows=61(3/1/1) mem=534/530/521KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=512/512/512KB]
-> Local Gather (cost=0.00..5.00 rows=50 width=2)
[id=5 dop=40 time=1/0/0ms rows=50(3/1/0) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=50 width=2)
[id=4 split_count=40 time=1/0/0ms rows=50(3/1/0) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=50(3/1/1)]
probe 端的 scan 算子使用了 bitmap。In Filter 可精确过滤,过滤后仅剩 50 行。scan 算子中的 scan_rows 为 700 多万(低于原始行数 1,000 万),这是因为 In Filter 可下推到存储引擎,减少 I/O 成本。In 类型 Runtime Filter 结合 bitmap 在 Join Key 为 STRING 类型时效果显著。
示例 5:MinMax 类型减少 I/O(V2.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::int, t::int FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::int, t::int FROM generate_series(1, 100000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
执行计划
QUERY PLAN
Gather (cost=0.00..15.68 rows=100000 width=16)
[40:1 id=100002 dop=1 time=5/5/5ms rows=100000(100000/100000/100000) mem=600/600/600B open=0/0/0ms get_next=5/5/5ms]
-> Hash Join (cost=0.00..11.98 rows=100000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=7 dop=40 time=5/4/4ms rows=100000(2639/2500/2406) mem=97/92/89KB open=1/0/0ms get_next=4/3/3ms]
-> Local Gather (cost=0.00..6.14 rows=10000000 width=8)
[id=3 dop=40 time=5/3/3ms rows=100000(2639/2500/2406) mem=600/600/600B open=1/0/0ms get_next=4/3/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..6.00 rows=10000000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=6/6/5ms rows=100000(2639/2500/2406) mem=61/60/59KB open=6/5/5ms get_next=0/0/0ms scan_rows=327680(8192/8192/8192)]
-> Hash (cost=5.01..5.01 rows=100000 width=8)
[id=6 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=463/460/458KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Local Gather (cost=0.00..5.01 rows=100000 width=8)
[id=5 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.01 rows=100000 width=8)
[id=4 split_count=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=100000(2639/2500/2406)]
probe 端 scan 算子从存储引擎读取的行数为 32 万多,远低于原始行数 1,000 万。这是因为 Runtime Filter 被下推到存储引擎,利用一个 Batch 数据的 meta 信息整批过滤,有可能大量减少 I/O 成本。该类型通常在 Join Key 为数值类型且 build 端值域范围小于 probe 端时效果明显。
示例 6:TopN Runtime Filter(V4.0+)
当 SQL 语句包含 topN 算子时,Hologres 不会计算所有结果,而是生成动态 Filter 提前对数据进行过滤。
SELECT o_orderkey FROM orders ORDER BY o_orderdate LIMIT 5;
执行计划:
QUERY PLAN
Limit (cost=0.00..116554.70 rows=0 width=8)
-> Sort (cost=0.00..116554.70 rows=100 width=12)
Sort Key: o_orderdate
[id=6 dop=1 time=317/317/317ms rows=5(5/5/5) mem=1/1/1KB open=317/317/317ms get_next=0/0/0ms]
-> Gather (cost=0.00..116554.25 rows=100 width=12)
[20:1 id=100002 dop=1 time=317/317/317ms rows=100(100/100/100) mem=6/6/6KB open=0/0/0ms get_next=317/317/317ms * ]
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
-> Local Gather (cost=0.00..9.59 rows=150000000 width=12)
[id=2 dop=20 time=316/280/256ms rows=1372205(68691/68610/68498) mem=0/0/0B open=0/0/0ms get_next=316/280/256ms local_dop=1/1/1 * ]
-> Seq Scan on orders (cost=0.00..8.24 rows=150000000 width=12)
Runtime Filter Target Expr: o_orderdate
[id=1 split_count=20 time=286/249/222ms rows=1372205(68691/68610/68498) mem=179/179/179KB open=0/0/0ms get_next=286/249/222ms physical_reads=27074(1426/1353/1294) scan_rows=144867963(7324934/7243398/7172304)]
Query id:[1001003033996040311]
QE version: 2.0
Query Queue: init_warehouse.default_queue
======================cost======================
Total cost:[343] ms
Optimizer cost:[13] ms
Build execution plan cost:[0] ms
Init execution plan cost:[6] ms
Start query cost:[6] ms
- Queue cost: [0] ms
- Wait schema cost:[0] ms
- Lock query cost:[0] ms
- Create dataset reader cost:[0] ms
- Create split reader cost:[0] ms
Get result cost:[318] ms
- Get the first block cost:[318] ms
====================resource====================
Memory: total 7 MB. Worker stats: max 3 MB, avg 3 MB, min 3 MB, max memory worker id: 189*****.
CPU time: total 5167 ms. Worker stats: max 2610 ms, avg 2583 ms, min 2557 ms, max CPU time worker id: 189*****.
DAG CPU time stats: max 5165 ms, avg 2582 ms, min 0 ms, cnt 2, max CPU time dag id: 1.
Fragment CPU time stats: max 5137 ms, avg 1721 ms, min 0 ms, cnt 3, max CPU time fragment id: 2.
Ec wait time: total 90 ms. Worker stats: max 46 ms, max(max) 2 ms, avg 45 ms, min 44 ms, max ec wait time worker id: 189*****, max(max) ec wait time worker id: 189*****.
Physical read bytes: total 799 MB. Worker stats: max 400 MB, avg 399 MB, min 399 MB, max physical read bytes worker id: 189*****.
Read bytes: total 898 MB. Worker stats: max 450 MB, avg 449 MB, min 448 MB, max read bytes worker id: 189*****.
DAG instance count: total 3. Worker stats: max 2, avg 1, min 1, max DAG instance count worker id: 189*****.
Fragment instance count: total 41. Worker stats: max 21, avg 20, min 20, max fragment instance count worker id: 189*****.
没有 TopN Filter 时,Scan 节点读取 orders 表的每个数据块并传给 TopN 节点,TopN 节点用堆排序维护当前已见数据中排名前 5 的行。
例如:每个数据块约含 8,192 行。处理完第一个块后,TopN 就知道该块中第 5 名的 o_orderdate。假设它是 1995-01-01。Scan 节点在读第二个块时,就用 1995-01-01 作过滤条件,只发送 o_orderdate <= 1995-01-01 的行给 TopN。阈值会动态更新,如果第二个块中第 5 名的 o_orderdate 更小,TopN 就用新值替换旧阈值。
通过 EXPLAIN 命令可查看优化器生成的 TopN Runtime Filter:
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
TopN 节点上显示 Runtime Filter Sort Column,表示该节点会产生 TopN Runtime Filter。
示例 7:Cross Join 使用 ScalarFilter(V4.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS t1, t2;
BEGIN;
CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
END;
INSERT INTO t1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO t2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE t1;
ANALYZE t2;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);
执行计划要点:
-
Cross Join 节点输出
Runtime Filter Build Expr: (min(t2.a)), (max(t2.a))。 -
t1 的 Scan 节点输出
Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2})。 -
t1 的
scan_rows仍为存储中读取的行数,经 ScalarFilter 过滤后rows显著下降,可据此验证过滤效果。
关闭功能验证(用于回归对比):
SET hg_experimental_generate_runtime_scalar_filter = off;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);
关闭后执行计划中不再出现 Runtime Filter Build Expr / Target Expr 字段,t1 退化为全表扫描。
版本演进总结
|
版本 |
新增能力 |
|
V2.0 |
支持 Hash Join 的 Runtime Filter(Local 类型,Bloom / In / MinMax Filter) |
|
V2.1 |
支持多字段 Join 的 Runtime Filter |
|
V2.2 |
支持 Global 类型 Runtime Filter(Shuffle Join);In Filter 可结合 Bitmap 索引 |
|
V4.0 |
支持 TopN Runtime Filter |
|
V4.2 |
支持 Cross Join Runtime Filter(ScalarFilter),优化「标量子查询 + 大表过滤」场景 |