In multi-table join queries, data that does not match the join condition increases I/O overhead and degrades query performance. In multi-table join scenarios, runtime filters automatically generate lightweight filters that prune non-matching data during the data scan phase. This reduces I/O overhead and improves query performance. This feature has supported Hash Join since Hologres V2.0 and was extended to Cross Join scenarios in V4.2.
Background
Use cases
Hologres has supported runtime filters since V2.0. This feature is commonly used in Hash Join scenarios involving two or more tables, especially when a large table joins a small table. No manual configuration is required. The optimizer and execution engine automatically optimize the join filtering behavior at query time, reducing I/O overhead and improving join performance.
Starting from V4.2, the runtime filter capability extends to Cross Join scenarios, specifically optimizing the common SQL pattern where a scalar subquery computes an aggregate result and uses it as a filter condition on a large table. For more information, see Cross Join support for runtime filters (New in V4.2).
How it works
Runtime filters for hash join
When two tables are joined, one table's data is loaded into a hash table, and the other table's data is matched against it. The join process has two sides:
-
Build side: The side that constructs the hash table, corresponding to the Hash node in the execution plan.
-
Probe side: The side that reads data and matches it against the build side hash table.
Typically, the small table serves as the build side and the large table as the probe side.
Runtime filters work by building a lightweight filter from the build side's data distribution and pushing it to the probe side to prune data. This reduces the amount of probe-side data processed in the Hash Join and minimizes network traffic, thereby improving join performance. As a result, this feature is most effective for joins between large and small tables with a significant size disparity, delivering greater performance gains than a standard join.
Runtime filters for cross join (New in V4.2)
Before V4.2, runtime filters covered only Hash Join. However, users often use scalar subqueries to compute aggregate values, such as min or max, and then use the result as a filter condition on a large table. In the execution plan, this SQL pattern produces a Cross Join:
-
Build side: The scalar subquery result, which is exactly one row.
-
Probe side: The large table scan.
V4.2 introduces a new filter type, ScalarFilter. The optimizer automatically identifies this pattern and pushes the evaluated scalar value from the build side down to the probe-side ScanNode. This enables row-level and RowGroup-level filtering during the scan phase, avoiding a full table scan followed by row-by-row comparisons.
Limitations and trigger conditions
Limitations
-
Runtime filters are supported only in Hologres V2.0 and later.
-
In Hash Join scenarios, V2.0 supports runtime filters only when the join condition contains a single field. Starting from V2.1, runtime filters support multiple fields.
-
TopN runtime filter is supported only in V4.0 and later and is used to improve performance for single-table TopN calculations.
-
The Cross Join runtime filter (ScalarFilter) is supported only in V4.2 and later.
Trigger conditions
Hash Join scenarios
The engine triggers a runtime filter automatically when all of the following conditions are met:
-
The probe side has 100,000 rows or more.
-
The ratio of scanned data from the build side to the probe side is 0.1 or less. The lower the ratio, the more likely the filter is triggered.
-
The ratio of join output data to probe-side data is 0.1 or less. The lower the ratio, the more likely the filter is triggered.
Cross Join scenarios (V4.2+)
The optimizer automatically generates a ScalarFilter when the following conditions are met:
-
The build side of the Cross Join has a statistics row count of 1 and its distribution is Replicated.
-
The filter predicates on the probe side contain comparison conditions with expressions from the build side, such as
>,<,>=,<=, orBETWEEN.
Types of runtime filters
Runtime filters can be classified along the following two dimensions.
By shuffle scope (for hash join)
|
Type |
Supported versions |
Scenarios |
|
Local |
V2.0+ |
Used when the probe side data does not need to be shuffled. A Local runtime filter can be used if the join keys of the build and probe sides share the same distribution, if the build side data is broadcast to the probe side, or if the build side data is shuffled to match the probe side's distribution. This type only reduces the amount of scanned data and data processed by the Hash Join. |
|
Global |
V2.2+ |
Used when the probe side data needs to be shuffled. The runtime filter is applied before the data shuffle, which reduces network traffic. |
You do not need to specify the type. The engine selects it adaptively.
By filter type
|
Type |
Supported versions |
Description |
|
Bloom filter |
V2.0+ |
This filter is probabilistic and may have false positives, which means some data might not be filtered. However, it is widely applicable and maintains high filtering efficiency even when the build side contains a large amount of data. |
|
In filter |
V2.0+ |
Used when the build side has a small NDV (number of distinct values). It constructs a HashSet from the build-side data and sends it to the probe side for filtering. It precisely filters all required data and can be used in conjunction with a bitmap index. |
|
MinMax filter |
V2.0+ |
Sends the minimum and maximum values from the build side data to the probe side for filtering. It can leverage metadata to skip entire files or a batch of data, reducing I/O costs. |
|
ScalarFilter |
V4.2+ |
Designed specifically for Cross Join scenarios. When the build side has exactly one row, the engine pushes the scalar value down to the probe-side ScanNode for filtering during the scan phase. |
You do not need to specify the filter type. Hologres selects a type adaptively based on the join conditions at runtime.
Cross join support for runtime filters (New in V4.2)
Motivation
Users often compute aggregate values with scalar subqueries and use the results as filter conditions on a large table. Before V4.2, this SQL pattern had the following issues in the execution plan:
-
The probe-side ScanNode could not leverage the build-side value for early filtering and had to perform a full table scan.
-
After the scan, the Cross Join performed row-by-row comparison, which caused significant I/O waste.
-
The existing runtime filter only covered Hash Join and did not support Cross Join.
V4.2 introduces the ScalarFilter to solve this problem. This feature is enabled by default and requires no user action.
Typical SQL and execution plan comparison
Typical SQL:
-- t1 is a large table; the aggregated min/max result from t2 is exactly 1 row.
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);
Before optimization (without runtime filter):
Cross Join
-> Seq Scan on t1 -- Full table scan, no early filtering
-> Aggregate -- Build side min/max, 1-row result
-> Seq Scan on t2
The probe side must scan all data, and then the Cross Join compares rows one by one, causing significant I/O waste.
After optimization (V4.2 with ScalarFilter):
Cross Join
Runtime Filter Build Expr: (min(t2.a)), (max(t2.a))
-> Seq Scan on t1 -- Receives ScalarFilter, filters non-matching rows and RowGroups during scan
Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2})
-> Aggregate
-> Seq Scan on t2
After the build side is evaluated, the actual values are pushed down to the probe-side ScanNode, allowing filtering to be completed during the scan phase.
More typical scenarios
-- Scenario 1: Single table + scalar subquery
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1);
-- Scenario 2: Multi-table + scalar subquery
SELECT * FROM t1, t3
WHERE (SELECT min(a) FROM t2) <= t1.a
AND t3.a <= (SELECT max(a) FROM t2);
-- Scenario 3: CTE + Cross Join
WITH r AS (SELECT MIN(a) AS lo, MAX(a) AS hi FROM t2)
SELECT t1.* FROM t1, r
WHERE t1.a >= r.lo AND t1.a <= r.hi;
How it works
-
Optimizer identification: When the build side of a Cross Join has a statistics row count of 1 and a Replicated distribution, the optimizer extracts comparison expressions from the predicates (expanding
BETWEENinto>=and<=) and generates ScalarFilter candidates. -
Execution-time pushdown: During the Open phase, the Cross Join consumes the build-side data, evaluates the
build_exprto produce a scalar value, builds a ScalarFilter, and publishes it to the probe-side ScanNode. -
Early filtering in ScanNode: After the probe-side NiagaraScan receives the ScalarFilter, it replaces the placeholders in the
target_exprwith actual values and generates two levels of filtering:-
Row-level filtering (conjunct eval).
-
RowGroup-level filtering (The storage engine skips non-matching data blocks).
-
Limitations
-
The build side must have exactly one row. The optimizer generates a ScalarFilter only when the build side has a statistics row count of 1 and a Replicated distribution. If the build side does not produce exactly one row at runtime, Hologres reports an
RT_CHECKerror. -
If the build-side value is NULL, Hologres publishes a
FILTER_ALLfilter (which filters all rows), clears the build-side data to short-circuit the Cross Join, and returns an empty result set. -
Dictionary columns are not supported. If the probe-side column referenced by
target_expris a dictionary-encoded column (DICTIONARY type), the ScalarFilter is not applied. -
Supported data types include
INT8,UINT8,INT16,UINT16,INT32,UINT32,INT64,UINT64,DATE32,TIMESTAMP,FLOAT,DOUBLE, andSTRING. If a data type is unsupported, the ScalarFilter is not pushed down, and no error is reported. -
Supported comparison operators include
>,<,>=, and<=.BETWEENis expanded into two range comparisons.
GUC parameters
|
Parameter |
Type |
Default |
Level |
Description |
|
|
bool |
|
|
Controls whether to generate a ScalarFilter for Cross Joins. This feature is enabled by default and requires no user action. |
To disable this feature, execute the following command:
SET hg_experimental_generate_runtime_scalar_filter = off;
New fields in EXPLAIN output
When ScalarFilter is enabled, the Cross Join node displays the following information:
-
Runtime Filter Build Expr: The build-side expression, such as(min(t2.a))and(max(t2.a)). -
Runtime Filter Target Expr: The target filter expression on the probe side, such as(t1.a >= ${1}) AND (t1.a <= ${2}). In this expression,${N}is a placeholder that is replaced at runtime with the build-side value corresponding to thefilter_id.
Performance benefits
In the TPC-DS 10 TB dataset, the following SQL pattern is common:
DELETE FROM inventory
WHERE inv_date_sk >= (SELECT min(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1')
AND inv_date_sk <= (SELECT max(d_date_sk) FROM date_dim WHERE d_date BETWEEN 'INV_S_1' AND 'INV_E_1');
With the V4.2 ScalarFilter optimization, query time drops from 2.672 seconds to 0.552 seconds, a performance increase of approximately 4.8x.
Verifying runtime filters
The following examples demonstrate how to verify the effects of runtime filters in different scenarios.
Example 1: Single-column join condition (local type)
BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
Execution plan:
QUERY PLAN
Gather (cost=0.00..10.20 rows=1000 width=16)
[40:1 id=100002 dop=1 time=9/9/9ms rows=1000(1000/1000/1000) mem=16/16/16KB open=2/2/2ms get_next=7/7/7ms]
-> Hash Join (cost=0.00..10.16 rows=1000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=8 dop=40 time=9/4/2ms rows=1000(39/25/17) mem=6/5/5KB open=4/1/0ms get_next=6/2/0ms]
-> Local Gather (cost=0.00..5.11 rows=1000000 width=8)
[id=3 dop=40 time=6/1/0ms rows=1000(39/25/17) mem=600/600/600B open=1/0/0ms get_next=6/1/0ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.10 rows=1000000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=11/7/7ms rows=1000(39/25/17) mem=41/41/41KB open=11/7/7ms get_next=0/0/0ms scan_rows=1000000 25270/25000/24697))]
-> Hash (cost=5.00..5.00 rows=1000 width=8)
[id=7 dop=40 time=3/1/0ms rows=1000(39/25/17) mem=396/396/396KB open=3/1/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=5 dop=40 time=1/0/0ms rows=1000(39/25/17) mem=0/0/0B open=1/0/0ms get_next=1/0/0ms local_dop=0/0/0]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=4 split_count=40 time=1/0/0ms rows=1000(39/25/17) mem=528/528/528B open=1/0/0ms get_next=1/0/0ms scan_rows=1000(39/25/17)]
-
The
test2table has 1,000 rows and thetest1table has 100,000 rows. The build-to-probe data size ratio is 0.01 (less than 0.1), which meets the default trigger condition for runtime filters. -
The probe-side scan on
test1showsRuntime Filter Target Expr, which indicates that a runtime filter was pushed down. -
On the probe side,
scan_rowsis 100,000 (the number of rows read from storage), whilerowsis 1,000 (the number of rows after filtering). This difference demonstrates the filtering effect.
Example 2: Multi-column join condition (V2.1+, local type)
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x AND test1.y = test2.y;
Execution plan:
QUERY PLAN
Gather (cost=0.00..10.46 rows=1000 width=16)
[40:1 id=100003 dop=1 time=6/6/6ms rows=1000(1000/1000/1000) mem=600/600/600B open=0/0/0ms get_next=6/6/6ms]
-> Hash Join (cost=0.00..10.43 rows=1000 width=16)
Hash Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
Runtime Filter Cond: ((test1.x = test2.x) AND (test1.y = test2.y))
[id=8 dop=40 time=5/3/3ms rows=1000(1000/25/0) mem=40/2/1KB open=1/0/0ms get_next=4/3/3ms]
-> Local Gather (cost=0.00..5.11 rows=1000000 width=8)
[id=5 dop=40 time=4/3/3ms rows=1000(1000/25/0) mem=600/600/600B open=0/0/0ms get_next=4/3/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.10 rows=1000000 width=8)
Runtime Filter Target Expr: (test1.x AND test1.y)
[id=4 split_count=40 time=7/5/5ms rows=1000(1000/25/0) mem=49/11/9KB open=7/5/5ms get_next=1/0/0ms scan_rows=1000000(32768/25000/24576)]
-> Hash (cost=5.02..5.02 rows=40000 width=8)
[id=7 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=417/417/417KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Broadcast (cost=0.00..5.02 rows=40000 width=8)
[40:40 id=100002 dop=40 time=1/0/0ms rows=40000(1000/1000/1000) mem=0/0/0B open=1/0/0ms get_next=0/0/0ms * ]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=3 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=0/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=2 split_count=40 time=1/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=1/0/0ms scan_rows=1000(1000/1000/1000)]
-
The join condition contains multiple columns, and the runtime filter is also generated for multiple columns.
-
The build-side data is broadcast, so a Local runtime filter is used.
Example 3: Global type (V2.2+, shuffle join)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CREATE TABLE test2 (x int, y int);
END;
INSERT INTO test1 SELECT t, t FROM generate_series(1, 100000) t;
INSERT INTO test2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
Execution plan:
QUERY PLAN
-> Hash Join (cost=0.00..10.08 rows=1000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=9 dop=40 time=10/8/8ms rows=1000(34/25/13) mem=6/6/5KB open=2/1/1ms get_next=8/7/7ms]
-> Redistribution (cost=0.00..5.07 rows=100000 width=8)
Hash Key: test1.x
[40:40 id=100002 dop=40 time=8/7/7ms rows=1289(46/32/20) mem=512/432/0B open=0/0/0ms get_next=8/7/7ms * ]
-> Local Gather (cost=0.00..5.01 rows=100000 width=8)
[id=3 dop=40 time=9/2/0ms rows=1289(1042/32/0) mem=600/600/600B open=0/0/0ms get_next=9/2/0ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..5.01 rows=100000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=11/3/0ms rows=1289(1042/32/0) mem=50512/11849/528B open=11/3/0ms get_next=1/0/0ms scan_rows=100000(8192/7692/1696)]
-> Hash (cost=5.00..5.00 rows=1000 width=8)
[id=8 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=396/396/396KB open=2/1/1ms get_next=0/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Redistribution (cost=0.00..5.00 rows=1000 width=8)
Hash Key: test2.x
[40:40 id=100003 dop=40 time=2/1/1ms rows=1000(34/25/13) mem=0/0/0B open=0/0/0ms get_next=2/1/1ms * ]
-> Local Gather (cost=0.00..5.00 rows=1000 width=8)
[id=5 dop=40 time=1/0/0ms rows=1000(1000/25/0) mem=600/600/600B open=1/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=1000 width=8)
[id=4 split_count=40 time=0/0/0ms rows=1000(1000/25/0) mem=528/528/528B open=0/0/0ms get_next=0/0/0ms scan_rows=1000(1000/1000/1000)]
The probe-side data is shuffled to the Hash Join operator. The engine automatically uses a Global runtime filter to accelerate the query.
Example 4: In filter with a bitmap index (V2.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x text, y text);
CALL set_table_property('test1', 'distribution_key', 'x');
CALL set_table_property('test1', 'bitmap_columns', 'x');
CALL set_table_property('test1', 'dictionary_encoding_columns', '');
CREATE TABLE test2 (x text, y text);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::text, t::text FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::text, t::text FROM generate_series(1, 50) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
Execution plan:
QUERY PLAN
Gather (cost=0.00..11.70 rows=50 width=14)
[40:1 id=100002 dop=1 time=16/16/16ms rows=50(50/50/50) mem=2/2/2KB open=0/0ms get_next=16/16/16ms]
-> Hash Join (cost=0.00..11.70 rows=50 width=14)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=7 dop=40 time=15/9/3ms rows=50(3/1/0) mem=5132/3774/264B open=1/0/0ms get_next=14/8/3ms]
-> Local Gather (cost=0.00..6.26 rows=10000000 width=12)
[id=3 dop=40 time=14/8/3ms rows=50(3/1/0) mem=600/600/600B open=1/0/0ms get_next=14/8/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..6.06 rows=10000000 width=12)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=16/10/5ms rows=50(3/1/0) mem=67544/48945/528B open=16/9/5ms get_next=1/0/0ms scan_rows=7247692(250875/249920/248982) bitmap_used=50]
-> Hash (cost=5.00..5.00 rows=50 width=2)
[id=6 dop=40 time=1/0/0ms rows=61(3/1/1) mem=534/530/521KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=512/512/512KB]
-> Local Gather (cost=0.00..5.00 rows=50 width=2)
[id=5 dop=40 time=1/0/0ms rows=50(3/1/0) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.00 rows=50 width=2)
[id=4 split_count=40 time=1/0/0ms rows=50(3/1/0) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=50(3/1/1)]
The probe-side scan operator uses a bitmap index. The In filter provides precise filtering, leaving only 50 rows. The scan_rows value in the scan operator is over 7 million, which is lower than the original 10 million rows. This is because the In filter can be pushed down to the storage engine, reducing I/O overhead. Combining an In filter with a bitmap index delivers significant gains when the join key is of the STRING type.
Example 5: MinMax filter for I/O reduction (V2.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS test1, test2;
BEGIN;
CREATE TABLE test1 (x int, y int);
CALL set_table_property('test1', 'distribution_key', 'x');
CREATE TABLE test2 (x int, y int);
CALL set_table_property('test2', 'distribution_key', 'x');
END;
INSERT INTO test1 SELECT t::int, t::int FROM generate_series(1, 10000000) t;
INSERT INTO test2 SELECT t::int, t::int FROM generate_series(1, 100000) t;
ANALYZE test1;
ANALYZE test2;
EXPLAIN ANALYZE SELECT * FROM test1 JOIN test2 ON test1.x = test2.x;
Execution plan:
QUERY PLAN
Gather (cost=0.00..15.68 rows=100000 width=16)
[40:1 id=100002 dop=1 time=5/5/5ms rows=100000(100000/100000/100000) mem=600/600/600B open=0/0/0ms get_next=5/5/5ms]
-> Hash Join (cost=0.00..11.98 rows=100000 width=16)
Hash Cond: (test1.x = test2.x)
Runtime Filter Cond: (test1.x = test2.x)
[id=7 dop=40 time=5/4/4ms rows=100000(2639/2500/2406) mem=97/92/89KB open=1/0/0ms get_next=4/3/3ms]
-> Local Gather (cost=0.00..6.14 rows=10000000 width=8)
[id=3 dop=40 time=5/3/3ms rows=100000(2639/2500/2406) mem=600/600/600B open=1/0/0ms get_next=4/3/3ms local_dop=1/1/1]
-> Seq Scan on test1 (cost=0.00..6.00 rows=10000000 width=8)
Runtime Filter Target Expr: test1.x
[id=2 split_count=40 time=6/6/5ms rows=100000(2639/2500/2406) mem=61/60/59KB open=6/5/5ms get_next=0/0/0ms scan_rows=327680(8192/8192/8192)]
-> Hash (cost=5.01..5.01 rows=100000 width=8)
[id=6 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=463/460/458KB open=1/0/0ms get_next=1/0/0ms rehash=1/1/1 hash_mem=384/384/384KB]
-> Local Gather (cost=0.00..5.01 rows=100000 width=8)
[id=5 dop=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=600/600/600B open=0/0/0ms get_next=1/0/0ms local_dop=1/1/1]
-> Seq Scan on test2 (cost=0.00..5.01 rows=100000 width=8)
[id=4 split_count=40 time=1/0/0ms rows=100000(2639/2500/2406) mem=528/528/528B open=1/0/0ms get_next=0/0/0ms scan_rows=100000(2639/2500/2406)]
The probe-side scan operator reads just over 320,000 rows from the storage engine, far fewer than the original 10 million. This occurs because the runtime filter is pushed down to the storage engine, which uses the metadata of a data batch to filter entire batches at once. This can significantly reduce I/O overhead. This filter type is most effective when the join key is numeric and the build-side value range is smaller than the probe-side range.
Example 6: TopN runtime filter (V4.0+)
When a SQL statement includes a topN operator, Hologres does not compute all results. Instead, it generates a dynamic filter to prune data early.
SELECT o_orderkey FROM orders ORDER BY o_orderdate LIMIT 5;
Execution plan:
QUERY PLAN
Limit (cost=0.00..116554.70 rows=0 width=8)
-> Sort (cost=0.00..116554.70 rows=100 width=12)
Sort Key: o_orderdate
[id=6 dop=1 time=317/317/317ms rows=5(5/5/5) mem=1/1/1KB open=317/317/317ms get_next=0/0/0ms]
-> Gather (cost=0.00..116554.25 rows=100 width=12)
[20:1 id=100002 dop=1 time=317/317/317ms rows=100(100/100/100) mem=6/6/6KB open=0/0/0ms get_next=317/317/317ms * ]
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
-> Local Gather (cost=0.00..9.59 rows=150000000 width=12)
[id=2 dop=20 time=316/280/256ms rows=1372205(68691/68610/68498) mem=0/0/0B open=0/0/0ms get_next=316/280/256ms local_dop=1/1/1 * ]
-> Seq Scan on orders (cost=0.00..8.24 rows=150000000 width=12)
Runtime Filter Target Expr: o_orderdate
[id=1 split_count=20 time=286/249/222ms rows=1372205(68691/68610/68498) mem=179/179/179KB open=0/0/0ms get_next=286/249/222ms physical_reads=27074(1426/1353/1294) scan_rows=144867963(7324934/7243398/7172304)]
Query id:[1001003033996040311]
QE version: 2.0
Query Queue: init_warehouse.default_queue
======================cost======================
Total cost:[343] ms
Optimizer cost:[13] ms
Build execution plan cost:[0] ms
Init execution plan cost:[6] ms
Start query cost:[6] ms
- Queue cost: [0] ms
- Wait schema cost:[0] ms
- Lock query cost:[0] ms
- Create dataset reader cost:[0] ms
- Create split reader cost:[0] ms
Get result cost:[318] ms
- Get the first block cost:[318] ms
====================resource====================
Memory: total 7 MB. Worker stats: max 3 MB, avg 3 MB, min 3 MB, max memory worker id: 189*****.
CPU time: total 5167 ms. Worker stats: max 2610 ms, avg 2583 ms, min 2557 ms, max CPU time worker id: 189*****.
DAG CPU time stats: max 5165 ms, avg 2582 ms, min 0 ms, cnt 2, max CPU time dag id: 1.
Fragment CPU time stats: max 5137 ms, avg 1721 ms, min 0 ms, cnt 3, max CPU time fragment id: 2.
Ec wait time: total 90 ms. Worker stats: max 46 ms, max(max) 2 ms, avg 45 ms, min 44 ms, max ec wait time worker id: 189*****, max(max) ec wait time worker id: 189*****.
Physical read bytes: total 799 MB. Worker stats: max 400 MB, avg 399 MB, min 399 MB, max physical read bytes worker id: 189*****.
Read bytes: total 898 MB. Worker stats: max 450 MB, avg 449 MB, min 448 MB, max read bytes worker id: 189*****.
DAG instance count: total 3. Worker stats: max 2, avg 1, min 1, max DAG instance count worker id: 189*****.
Fragment instance count: total 41. Worker stats: max 21, avg 20, min 20, max fragment instance count worker id: 189*****.
Without the TopN runtime filter, the ScanNode would read every data block from the orders table and pass it to the TopN node, which would then use a heap sort to maintain the top 5 rows seen so far.
For example, each data block contains approximately 8,192 rows. After the first block is processed, TopN knows the 5th-ranked o_orderdate in that block. Suppose it is 1995-01-01. When the Scan node reads the second block, it uses 1995-01-01 as a filter condition and sends only rows where o_orderdate <= 1995-01-01 to TopN. The threshold is dynamically updated. If the 5th-ranked o_orderdate in the second block is smaller, TopN replaces the old threshold with the new value.
You can use the EXPLAIN output to view the TopN runtime filter generated by the optimizer:
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
The presence of Runtime Filter Sort Column on the TopN node indicates that this node generates a TopN runtime filter.
Example 7: Cross join with ScalarFilter (V4.2+)
SET hg_experimental_enable_result_cache = OFF;
DROP TABLE IF EXISTS t1, t2;
BEGIN;
CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
END;
INSERT INTO t1 SELECT t, t FROM generate_series(1, 1000000) t;
INSERT INTO t2 SELECT t, t FROM generate_series(1, 1000) t;
ANALYZE t1;
ANALYZE t2;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);
Execution plan highlights:
-
The Cross Join node shows
Runtime Filter Build Expr: (min(t2.a)), (max(t2.a)). -
The ScanNode on t1 shows
Runtime Filter Target Expr: (t1.a >= ${1}) AND (t1.a <= ${2}). -
For t1,
scan_rowsreflects the number of rows read from storage, whilerowsdrops significantly after the ScalarFilter is applied. This difference verifies the filter's effect.
Disabling the feature for comparison:
SET hg_experimental_generate_runtime_scalar_filter = off;
EXPLAIN ANALYZE
SELECT * FROM t1
WHERE a >= (SELECT min(a) FROM t2 WHERE b BETWEEN 0 AND 1)
AND a <= (SELECT max(a) FROM t2 WHERE b BETWEEN 0 AND 1);
After the feature is disabled, the execution plan no longer shows the Runtime Filter Build Expr and Target Expr fields, and the scan on t1 reverts to a full table scan.
Version History
|
Version |
New features |
|
V2.0 |
Support for runtime filters in Hash Join (Local type, including Bloom, In, and MinMax filters). |
|
V2.1 |
Support for runtime filters with multi-column join conditions. |
|
V2.2 |
Support for Global runtime filters (for shuffle joins); In filters can be combined with bitmap indexes. |
|
V4.0 |
Support for TopN runtime filter. |
|
V4.2 |
Support for Cross Join runtime filters (ScalarFilter) to optimize scenarios that use a scalar subquery to filter a large table. |