Runtime Filter 多表 Join 加速

更新时间:
复制 MD 格式

在多表 Join 查询中,未参与 Join 匹配的数据会增加 I/O 开销并降低查询性能。Runtime Filter 在多表 Join 场景下自动生成轻量过滤器,在数据扫描阶段提前裁剪无效数据,减少 I/O 开销并提升查询性能。该功能自 Hologres V2.0 开始支持 Hash Join,V4.2 扩展至 Cross Join 场景。

背景信息

应用场景

Hologres 从 V2.0 版本开始支持 Runtime Filter,通常应用在多表(两表及以上)Join 的 Hash Join 场景,尤其是大表 Join 小表的场景中,无需手动设置,优化器和执行引擎会在查询时自动优化 Join 过程的过滤行为,从而降低 I/O 开销,提升 Join 的查询性能。

V4.2 版本开始,Runtime Filter 的能力进一步扩展到 Cross Join 场景,专门优化「标量子查询聚合结果作为大表过滤条件」这类高频 SQL 模式(详见Cross Join 支持 Runtime Filter(V4.2 新增))。

原理介绍

Hash Join 场景下的 Runtime Filter

两个表 Join 时,会通过一张表构建 Hash 表,然后匹配另一张表的数据。Join 过程涉及两个端:

  • build 端:构建 Hash 表的一侧,对应执行计划中的 Hash 节点。

  • probe 端:读取数据并与 build 端的 Hash 表进行匹配的一侧。

通常小表作为 build 端,大表作为 probe 端。

Runtime Filter 的原理是:利用 build 端的数据分布生成轻量过滤器,发送给 probe 端对数据进行裁剪,减少 probe 端参与 Hash Join 及网络传输的数据量,从而提升 Join 性能。 因此该功能更适用于大小表 Join 且表数据量相差较大的场景,性能将会比普通 Join 有更多提升。

Cross Join 场景下的 Runtime Filter(V4.2 新增)

V4.2 之前,Runtime Filter 仅覆盖 Hash Join。但用户经常使用标量子查询计算聚合值(如 min / max),并将结果作为大表的过滤条件,这类 SQL 在执行计划中产生的是 Cross Join:

  • build 端:标量子查询结果,恰好 1 行。

  • probe 端:大表扫描。

V4.2 引入了新的 Filter 类型 —— ScalarFilter,由优化器自动识别并将 build 端求值后的标量值下推给 probe 端 ScanNode,扫描阶段即完成行级与 RowGroup 级过滤,避免大表全表扫描后再逐行比较。

使用限制和触发条件

使用限制

  • 仅 Hologres V2.0 及以上版本支持 Runtime Filter。

  • Hash Join 场景下,V2.0 版本仅支持 Join 条件中只有一个字段。从 V2.1 版本开始,支持多个字段的 Runtime Filter。

  • 仅 V4.0 及以上版本支持 TopN Runtime Filter,用以提升单表 TopN 计算场景的性能。

  • 仅 V4.2 及以上版本支持 Cross Join Runtime Filter(ScalarFilter)

触发条件

Hash Join 场景

Runtime Filter 由引擎自动触发,需同时满足以下条件:

  • probe 端数据量在 100,000 行及以上。

  • 扫描数据量比例:build 端 / probe 端 ≤ 0.1(比例越小,越容易触发)。

  • Join 输出数据量比例:build 端 / probe 端 ≤ 0.1(比例越小,越容易触发)。

Cross Join 场景(V4.2+)

由优化器自动生成 ScalarFilter,需满足:

  • Cross Join 的 build 端统计行数为 1,且分布为 Replicated。

  • probe 端的过滤谓词中包含 build 端表达式的比较条件(><>=<=BETWEEN)。

Runtime Filter 的类型

Runtime Filter 可从以下两个维度进行分类。

按 Shuffle 维度划分(适用于 Hash Join)

类型

支持版本

适用场景

Local

V2.0+

probe 端数据不需要 Shuffle 时使用。build 端和 probe 端的 Join Key 为同一种分布方式,或 build 端数据 broadcast 给 probe 端,或 build 端数据按照 probe 端的分布方式 Shuffle 给 probe 端,均可使用 Local 类型。仅减少数据扫描量及参与 Hash Join 计算的数据量。

Global

V2.2+

probe 端数据需要 Shuffle 时使用。Runtime Filter 在数据 Shuffle 之前做过滤,可减少数据的网络传输量。

说明

类型无需手动指定,引擎会自适应选择。

按 Filter 类型划分

类型

支持版本

说明

Bloom Filter

V2.0+

具有一定假阳性,可能少过滤部分数据,但应用范围广,在 build 端数据量较多时仍有较高的过滤效率。

In Filter

V2.0+

build 端数据 NDV(非重复值个数)较小时使用。使用 build 端数据构建 HashSet 发送给 probe 端过滤,可过滤所有应过滤的数据,且能与 Bitmap 索引结合使用。

MinMax Filter

V2.0+

根据 build 端数据的最大值和最小值发送给 probe 端做过滤。可根据元数据信息直接过滤掉文件或一个 Batch 的数据,减少 I/O 成本。

ScalarFilter

V4.2+

专用于 Cross Join 场景。当 build 端恰好为 1 行时,引擎将标量值下推至 probe 端 ScanNode,在扫描阶段直接过滤。

说明

Filter 类型无需手动指定,Hologres 会根据运行时 Join 情况自适应使用。

Cross Join 支持 Runtime Filter(V4.2 新增)

需求背景

用户经常使用标量子查询计算聚合值,并将结果作为大表的过滤条件。这类 SQL 在 V4.2 之前的执行计划中存在如下问题:

  • probe 端 ScanNode 无法利用 build 端的值提前过滤,必须全表扫描

  • 扫描完成后由 Cross Join 逐行比较,造成大量无效 I/O。

  • 已有的 Runtime Filter 仅覆盖 Hash Join,不支持 Cross Join

V4.2 引入 ScalarFilter,从根本上解决该问题,用户无需任何额外操作,功能默认开启

典型 SQL 与执行计划对比

典型 SQL

-- t1 大表,t2 聚合后 min/max 结果恰好 1 行
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);

优化前(无 Runtime Filter):

 Cross Join
   ->  Seq Scan on t1             -- 全表扫描,无法提前过滤
   ->  Aggregate                  -- build 端 min/max,结果 1 行
         ->  Seq Scan on t2

probe 端必须扫描全量数据,再由 Cross Join 逐行比较,存在大量无效 I/O。

优化后(V4.2 ScalarFilter):

 Cross Join
   Runtime Filter Build Expr: (min(t2.a)), (max(t2.a))
   ->  Seq Scan on t1             -- 收到 ScalarFilter,扫描时直接过滤不满足条件的行和 RowGroup
         Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2})
   ->  Aggregate
         ->  Seq Scan on t2

build 端求值后,将实际值下推给 probe 端 ScanNode,扫描阶段即完成过滤。

更多典型场景

-- 场景一:单表 + 标量子查询
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);
-- 场景二:多表 + 标量子查询
SELECT * FROM t1, t3
WHERE (SELECT min(a) FROM t2) <= t1.a
  AND t3.a <= (SELECT max(a) FROM t2);
-- 场景三:CTE + Cross Join
WITH r AS (SELECT MIN(a) AS lo, MAX(a) AS hi FROM t2)
SELECT t1.* FROM t1, r
WHERE t1.a >= r.lo AND t1.a <= r.hi;

工作原理

  1. 优化器识别:当 Cross Join 的 build 端统计行数为 1 且分布为 Replicated 时,从谓词中提取比较表达式(BETWEEN 展开为 >= + <=),生成 ScalarFilter 候选。

  2. 执行期下推:Cross Join 在 Open 阶段消费 build 端数据后,对 build_expr 求值得到标量值,构建 ScalarFilter 发布给 probe 端 ScanNode。

  3. ScanNode 提前过滤:probe 端 NiagaraScan 接收到 ScalarFilter 后,用实际值替换 target_expr 中的占位符,同时生成两级过滤:

    • 行级过滤(conjunct eval)。

    • RowGroup 级过滤(存储引擎层直接跳过不满足条件的数据块)。

使用限制

  • build 端必须恰好 1 行。优化器仅在 build 端统计行数为 1 且为 Replicated 分布时生成 ScalarFilter。如果运行时 build 端非 1 行,会触发 RT_CHECK 报错。

  • build 端值为 NULL 时,发布 FILTER_ALL(过滤所有行),同时清空 build 端数据短路 Cross Join,直接返回空结果。

  • 不支持字典列。如果 probe 端 target_expr 引用的列是字典编码列(DICTIONARY type),则不会应用 ScalarFilter。

  • 支持的数据类型:INT8 / UINT8 / INT16 / UINT16 / INT32 / UINT32 / INT64 / UINT64 / DATE32 / TIMESTAMP / FLOAT / DOUBLE / STRING。不支持的类型会跳过 ScalarFilter(不报错,仅不下推)。

  • 支持的比较操作符:><>=<=BETWEEN 会被展开为两个范围比较。

GUC 参数

参数名

类型

默认值

级别

说明

hg_experimental_generate_runtime_scalar_filter

bool

true

PGC_USERSET

是否为 Cross Join 生成 ScalarFilter 类型的 Runtime Filter。默认开启,用户无需额外操作。

关闭该功能:

SET hg_experimental_generate_runtime_scalar_filter = off;

EXPLAIN 输出新增字段

启用 ScalarFilter 后,Cross Join 节点会显示:

  • Runtime Filter Build Expr:build 端表达式,例如 (min(t2.a))(max(t2.a))

  • Runtime Filter Target Expr:probe 端目标过滤表达式,例如 (t1.a >= ${1}) AND (t1.a <= ${2}),其中 ${N} 为运行时替换的 filter_id 对应的 build 端值占位符。

性能收益

在 TPC-DS 10TB 数据集中,存在如下典型 SQL 模式:

DELETE FROM inventory
WHERE inv_date_sk >= (SELECT min(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1')
  AND inv_date_sk <= (SELECT max(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1');

经 V4.2 ScalarFilter 优化后,性能由 2.672 秒 提升至 0.552 秒,提升约 4.8 倍

验证 Runtime Filter

以下示例帮助您验证 Runtime Filter 在不同场景下的效果。

示例 1:Join 条件中只有 1 列(Local 类型)

BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;

执行计划。

QUERY PLAN
Gather  (cost=0.00..10.20 rows=1000 width=16)
[40:1 id=100002 dop=1 time=9/9/9ms rows=1000(1000/1000/1000) mem=16/16/16KB open=2/2/2ms get_next=7/7/7ms]
  -> Hash Join  (cost=0.00..10.16 rows=1000 width=16)
      Hash Cond: (test1.x = test2.x)
      Runtime Filter Cond: (test1.x = test2.x)
      [id=8 dop=40 time=9/4/2ms rows=1000(39/25/17) mem=6/5/5KB open=4/1/0ms get_next=6/2/0ms]
      -> Local Gather  (cost=0.00..5.11 rows=1000000 width=8)
          [id=3 dop=40 time=6/1/0ms rows=1000(39/25/17) mem=600/600/600B open=1/0/0ms get_next=6/1/0ms local_dop=1/1/1]
          -> Seq Scan on test1  (cost=0.00..5.10 rows=1000000 width=8)
              Runtime Filter Target Expr: test1.x
              [id=2 split_count=40 time=11/7/7ms rows=1000(39/25/17) mem=41/41/41KB open=11/7/7ms get_next=0/0/0ms scan_rows=1000000 25270/25000/24697)]
      -> Hash  (cost=5.00..5.00 rows=1000 width=8)
          [id=7 dop=40 time=3/1/0ms rows=1000(39/25/17) mem=396/396/396KB open=3/1/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
          -> Local Gather  (cost=0.00..5.00 rows=1000 width=8)
              [id=5 dop=40 time=1/0/0ms rows=1000(39/25/17) mem=0/0/0B open=1/0/0ms get_next=1/0/0ms local_dop=0/0/0]
              -> Seq Scan on test2  (cost=0.00..5.00 rows=1000 width=8)
                  [id=4 split_count=40 time=1/0/0ms rows=1000(39/25/17) mem=528/528/528B open=1/0/0ms get_next=1/0/0ms scan_rows=1000(39/25/17)]
  • test2 表 1,000 行,test1 表 100,000 行,build 端和 probe 端的数据量比例为 0.01(小于 0.1),满足 Runtime Filter 的默认触发条件。

  • probe 端的 test1 表出现 Runtime Filter Target Expr 节点,表示 Runtime Filter 已下推。

  • probe 端 scan_rows 为 100,000 行(从存储中读取的数据),rows 为 1,000 行(过滤后的行数),可据此观察过滤效果。

示例 2:Join 条件中有多列(V2.1+,Local 类型)

DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x AND test1.y = test2.y;

执行计划

QUERY PLAN
Gather  (cost=0.00..10.46 rows=1000 width=16)
[40:1 id=100003 dop=1 time=6/6/6ms rows=1000(1000/1000/1000) mem=600/600/600B open=0/0/0ms get_next=6/6/6ms]
  -> Hash Join  (cost=0.00..10.43 rows=1000 width=16)
       Hash Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
       Runtime Filter Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
       [id=8 dop=40 time=5/3/3ms rows=1000(1000/25/0) mem=40/2/1KB open=1/0/0ms get_next=4/3/3ms]
       -> Local Gather  (cost=0.00..5.11 rows=1000000 width=8)
            [id=5 dop=40 time=4/3/3ms rows=1000(1000/25/0) mem=600/600/600B open=0/0/0ms get_next=4/3/3ms local_dop=1/1/1]
            -> Seq Scan on test1  (cost=0.00..5.10 rows=1000000 width=8)
                 Runtime Filter Target Expr: (test1.x AND test1.y)
                 [id=4 split_count=40 time=7/5/5ms rows=1000(1000/25/0) mem=49/11/9KB open=7/5/5ms get_next=1/0/0ms scan_rows=1000000(32768/25000/24576)]
       -> Hash  (cost=5.02..5.02 rows=40000 width=8)
            [id=7 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=417/417/417KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
            -> Broadcast  (cost=0.00..5.02 rows=40000 width=8)
                 [40:40 id=100002 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=0/0/0B open=1/0/0ms get_next=0/0/0ms * ]
                 -> Local Gather  (cost=0.00..5.00 rows=1000 width=8)
                      [id=3 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=0/0/0ms local_dop=1/1/1]
                      -> Seq Scan on test2  (cost=0.00..5.00 rows=1000 width=8)
                           [id=2 split_count=40 time=1/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=1/0/0ms scan_rows=1000(1000/1000/1000)]
  • Join 条件有多列,Runtime Filter 也生成了多列。

  • build 端数据采用 broadcast 方式,可使用 Local 类型的 Runtime Filter。

示例 3:Global 类型(V2.2+,支持 Shuffle Join)

SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;

执行计划

QUERY PLAN
   -> Hash Join  (cost=0.00..10.08 rows=1000 width=16)
      Hash Cond: (test1.x = test2.x)
      Runtime Filter Cond: (test1.x = test2.x)
      [id=9 dop=40 time=10/8/8ms rows=1000(34/25/13) mem=6/6/5KB open=2/1/1ms get_next=8/7/7ms]
      -> Redistribution  (cost=0.00..5.07 rows=100000 width=8)
         Hash Key: test1.x
         [40:40 id=100002 dop=40 time=8/7/7ms rows=1289(46/32/20) mem=512/432/0B open=0/0/0ms get_next=8/7/7ms * ]
      -> Local Gather  (cost=0.00..5.01 rows=100000 width=8)
         [id=3 dop=40 time=9/2/0ms rows=1289(1042/32/0) mem=600/600/600B open=0/0/0ms get_next=9/2/0ms local_dop=1/1/1]
         -> Seq Scan on test1  (cost=0.00..5.01 rows=100000 width=8)
            Runtime Filter Target Expr: test1.x
            [id=2 split_count=40 time=11/3/0ms rows=1289(1042/32/0) mem=50512/11849/528B open=11/3/0ms get_next=1/0/0ms scan_rows=100000(8192/7692/1696)]
      -> Hash  (cost=5.00..5.00 rows=1000 width=8)
         [id=8 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=396/396/396KB open=2/1/1ms get_next=0/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
         -> Redistribution  (cost=0.00..5.00 rows=1000 width=8)
            Hash Key: test2.x
            [40:40 id=100003 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=0/0/0B open=0/0/0ms get_next=2/1/1ms * ]
         -> Local Gather  (cost=0.00..5.00 rows=1000 width=8)
            [id=5 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=1/0/0ms local_dop=1/1/1]
            -> Seq Scan on test2  (cost=0.00..5.00 rows=1000 width=8)
               [id=4 split_count=40 time=0/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=0/0/0ms scan_rows=1000(1000/1000/1000)]

probe 端数据被 Shuffle 到 Hash Join 算子,引擎自动使用 Global Runtime Filter 加速查询。

示例 4:In 类型结合 Bitmap 索引(V2.2+)

SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x text, y text);
CALL set_table_property('test1', 'distribution_key', 'x');
CALL set_table_property('test1', 'bitmap_columns', 'x');
CALL set_table_property('test1', 'dictionary_encoding_columns', '');
CREATE TABLE test2 (x text, y text);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::text, t::text FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::text, t::text FROM generate_series(1, 50) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;

执行计划。

QUERY PLAN
Gather  (cost=0.00..11.70 rows=50 width=14)
[40:1 id=100002 dop=1 time=16/16/16ms rows=50(50/50/50) mem=2/2/2KB open=0/0ms get_next=16/16/16ms]
  -> Hash Join  (cost=0.00..11.70 rows=50 width=14)
       Hash Cond: (test1.x = test2.x)
       Runtime Filter Cond: (test1.x = test2.x)
       [id=7 dop=40 time=15/9/3ms rows=50(3/1/0) mem=5132/3774/264B open=1/0/0ms get_next=14/8/3ms]
       -> Local Gather  (cost=0.00..6.26 rows=10000000 width=12)
            [id=3 dop=40 time=14/8/3ms rows=50(3/1/0) mem=600/600/600B open=1/0/0ms get_next=14/8/3ms local_dop=1/1/1]
            -> Seq Scan on test1  (cost=0.00..6.06 rows=10000000 width=12)
                 Runtime Filter Target Expr: test1.x
                 [id=2 split_count=40 time=16/10/5ms rows=50(3/1/0) mem=67544/48945/528B open=16/9/5ms get_next=1/0/0ms scan_rows=7247692(250875/249920/248982) bitmap_used=50]
       -> Hash  (cost=5.00..5.00 rows=50 width=2)
            [id=6 dop=40 time=1/0/0ms rows=61(3/1/1) mem=534/530/521KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=512/512/512KB]
            -> Local Gather  (cost=0.00..5.00 rows=50 width=2)
                 [id=5 dop=40 time=1/0/0ms rows=50(3/1/0) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
                 -> Seq Scan on test2  (cost=0.00..5.00 rows=50 width=2)
                      [id=4 split_count=40 time=1/0/0ms rows=50(3/1/0) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=50(3/1/1)]

probe 端的 scan 算子使用了 bitmap。In Filter 可精确过滤,过滤后仅剩 50 行。scan 算子中的 scan_rows 为 700 多万(低于原始行数 1,000 万),这是因为 In Filter 可下推到存储引擎,减少 I/O 成本。In 类型 Runtime Filter 结合 bitmap 在 Join Key 为 STRING 类型时效果显著。

示例 5:MinMax 类型减少 I/O(V2.2+)

SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::int, t::int FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::int, t::int FROM generate_series(1, 100000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;

执行计划

QUERY PLAN
 Gather  (cost=0.00..15.68 rows=100000 width=16)
   [40:1 id=100002 dop=1 time=5/5/5ms rows=100000(100000/100000/100000) mem=600/600/600B open=0/0/0ms get_next=5/5/5ms]
   -> Hash Join  (cost=0.00..11.98 rows=100000 width=16)
        Hash Cond: (test1.x = test2.x)
        Runtime Filter Cond: (test1.x = test2.x)
        [id=7 dop=40 time=5/4/4ms rows=100000(2639/2500/2406) mem=97/92/89KB open=1/0/0ms get_next=4/3/3ms]
        -> Local Gather  (cost=0.00..6.14 rows=10000000 width=8)
             [id=3 dop=40 time=5/3/3ms rows=100000(2639/2500/2406) mem=600/600/600B open=1/0/0ms get_next=4/3/3ms local_dop=1/1/1]
             -> Seq Scan on test1  (cost=0.00..6.00 rows=10000000 width=8)
                  Runtime Filter Target Expr: test1.x
                  [id=2 split_count=40 time=6/6/5ms rows=100000(2639/2500/2406) mem=61/60/59KB open=6/5/5ms get_next=0/0/0ms scan_rows=327680(8192/8192/8192)]
        -> Hash  (cost=5.01..5.01 rows=100000 width=8)
             [id=6 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=463/460/458KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
             -> Local Gather  (cost=0.00..5.01 rows=100000 width=8)
                  [id=5 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
                  -> Seq Scan on test2  (cost=0.00..5.01 rows=100000 width=8)
                       [id=4 split_count=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=100000(2639/2500/2406)]

probe 端 scan 算子从存储引擎读取的行数为 32 万多,远低于原始行数 1,000 万。这是因为 Runtime Filter 被下推到存储引擎,利用一个 Batch 数据的 meta 信息整批过滤,有可能大量减少 I/O 成本。该类型通常在 Join Key 为数值类型且 build 端值域范围小于 probe 端时效果明显。

示例 6:TopN Runtime Filter(V4.0+)

当 SQL 语句包含 topN 算子时,Hologres 不会计算所有结果,而是生成动态 Filter 提前对数据进行过滤。

SELECT o_orderkey FROM orders ORDER BY o_orderdate LIMIT 5;

执行计划:

QUERY PLAN
Limit  (cost=0.00..116554.70 rows=0 width=8)
  ->  Sort  (cost=0.00..116554.70 rows=100 width=12)
        Sort Key: o_orderdate
      [id=6 dop=1 time=317/317/317ms rows=5(5/5/5) mem=1/1/1KB open=317/317/317ms get_next=0/0/0ms]
        ->  Gather  (cost=0.00..116554.25 rows=100 width=12)
            [20:1 id=100002 dop=1 time=317/317/317ms rows=100(100/100/100) mem=6/6/6KB open=0/0/0ms get_next=317/317/317ms * ]
              ->  Limit  (cost=0.00..116554.25 rows=0 width=12)
                    ->  Sort  (cost=0.00..116554.25 rows=150000000 width=12)
                          Sort Key: o_orderdate
                          Runtime Filter Sort Column: o_orderdate
                        [id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
                          ->  Local Gather  (cost=0.00..9.59 rows=150000000 width=12)
                              [id=2 dop=20 time=316/280/256ms rows=1372205(68691/68610/68498) mem=0/0/0B open=0/0/0ms get_next=316/280/256ms local_dop=1/1/1 * ]
                                ->  Seq Scan on orders  (cost=0.00..8.24 rows=150000000 width=12)
                                      Runtime Filter Target Expr: o_orderdate
                                    [id=1 split_count=20 time=286/249/222ms rows=1372205(68691/68610/68498) mem=179/179/179KB open=0/0/0ms get_next=286/249/222ms physical_reads=27074(1426/1353/1294) scan_rows=144867963(7324934/7243398/7172304)]
Query id:[1001003033996040311]
QE version: 2.0
Query Queue: init_warehouse.default_queue
======================cost======================
Total cost:[343] ms
Optimizer cost:[13] ms
Build execution plan cost:[0] ms
Init execution plan cost:[6] ms
Start query cost:[6] ms
- Queue cost: [0] ms
- Wait schema cost:[0] ms
- Lock query cost:[0] ms
- Create dataset reader cost:[0] ms
- Create split reader cost:[0] ms
Get result cost:[318] ms
- Get the first block cost:[318] ms
====================resource====================
Memory: total 7 MB. Worker stats: max 3 MB, avg 3 MB, min 3 MB, max memory worker id: 189*****.
CPU time: total 5167 ms. Worker stats: max 2610 ms, avg 2583 ms, min 2557 ms, max CPU time worker id: 189*****.
DAG CPU time stats: max 5165 ms, avg 2582 ms, min 0 ms, cnt 2, max CPU time dag id: 1.
Fragment CPU time stats: max 5137 ms, avg 1721 ms, min 0 ms, cnt 3, max CPU time fragment id: 2.
Ec wait time: total 90 ms. Worker stats: max 46 ms, max(max) 2 ms, avg 45 ms, min 44 ms, max ec wait time worker id: 189*****, max(max) ec wait time worker id: 189*****.
Physical read bytes: total 799 MB. Worker stats: max 400 MB, avg 399 MB, min 399 MB, max physical read bytes worker id: 189*****.
Read bytes: total 898 MB. Worker stats: max 450 MB, avg 449 MB, min 448 MB, max read bytes worker id: 189*****.
DAG instance count: total 3. Worker stats: max 2, avg 1, min 1, max DAG instance count worker id: 189*****.
Fragment instance count: total 41. Worker stats: max 21, avg 20, min 20, max fragment instance count worker id: 189*****.

没有 TopN Filter 时,Scan 节点读取 orders 表的每个数据块并传给 TopN 节点,TopN 节点用堆排序维护当前已见数据中排名前 5 的行。

例如:每个数据块约含 8,192 行。处理完第一个块后,TopN 就知道该块中第 5 名的 o_orderdate。假设它是 1995-01-01。Scan 节点在读第二个块时,就用 1995-01-01 作过滤条件,只发送 o_orderdate <= 1995-01-01 的行给 TopN。阈值会动态更新,如果第二个块中第 5 名的 o_orderdate 更小,TopN 就用新值替换旧阈值。

通过 EXPLAIN 命令可查看优化器生成的 TopN Runtime Filter:

->  Limit  (cost=0.00..116554.25 rows=0 width=12)
  ->  Sort  (cost=0.00..116554.25 rows=150000000 width=12)
        Sort Key: o_orderdate
        Runtime Filter Sort Column: o_orderdate
      [id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]

TopN 节点上显示 Runtime Filter Sort Column,表示该节点会产生 TopN Runtime Filter。

示例 7:Cross Join 使用 ScalarFilter(V4.2+)

SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS t1, t2;
BEGIN;
CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
END;
INSERT INTO t1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO t2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE t1;
ANALYZE t2;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
  AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);

执行计划要点:

  • Cross Join 节点输出 Runtime Filter Build Expr: (min(t2.a)), (max(t2.a))

  • t1 的 Scan 节点输出 Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2})

  • t1 的 scan_rows 仍为存储中读取的行数,经 ScalarFilter 过滤后 rows 显著下降,可据此验证过滤效果。

关闭功能验证(用于回归对比):

SET hg_experimental_generate_runtime_scalar_filter = off;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
  AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);

关闭后执行计划中不再出现 Runtime Filter Build Expr / Target Expr 字段,t1 退化为全表扫描。

版本演进总结

版本

新增能力

V2.0

支持 Hash Join 的 Runtime Filter(Local 类型,Bloom / In / MinMax Filter)

V2.1

支持多字段 Join 的 Runtime Filter

V2.2

支持 Global 类型 Runtime Filter(Shuffle Join);In Filter 可结合 Bitmap 索引

V4.0

支持 TopN Runtime Filter

V4.2

支持 Cross Join Runtime Filter(ScalarFilter),优化「标量子查询 + 大表过滤」场景