在PolarDB PostgreSQL分布式版集群中,查询性能与SQL的写法密切相关。编写优良的SQL可以最大化地利用分布式架构的优势,将计算下推到数据节点(DN)并行执行,从而避免不必要的数据传输和协调开销。本文为您提供一套实用的查询优化方法。
优化方式
查询场景 | 优化方法 | 核心原则 |
单表点查或小范围扫描 | 在 | 分片剪枝:精确定位到存储数据的单个或少数分片,避免全部分片扫描。 |
多张大表关联(JOIN) |
| 计算下推:将JOIN操作下推到各数据节点本地执行,避免跨节点网络开销。 |
分组聚合(GROUP BY) | 在 | 聚合下推:将大部分聚合运算在数据节点本地完成,仅在协调节点做最终汇总。 |
复杂查询或非亲和JOIN | 使用**CTE(Common Table Expressions)**改写查询。 | 减少数据重分布:通过CTE先过滤出小结果集,再进行后续的分布式操作,降低网络传输量。 |
利用分布列进行分片剪枝(单表查询优化)
分片剪枝是分布式查询中最基础、最高效的优化手段。当查询条件中包含分布列的等值判断时,优化器能直接定位到数据所在的唯一分片,性能接近单机查询。
原理:
当您发起查询时,分布式优化器会检查WHERE子句。如果它发现条件作用于分布列(如t2.a = 1000),优化器会根据1000这个值计算出其哈希值,从而精确推断出数据存储在哪一个物理分片上。随后,它会剪掉所有其他无关的分片,只将查询任务发送给目标分片所在的数据节点。这个过程就是分片剪枝。
示例:
创建
t2表,并以a列为分布列。CREATE TABLE t2 (a INT, b INT); SELECT create_distributed_table('t2', 'a');查询
a = 1000的数据,点查会直接被定位到唯一的分片(在下面的例子中是t2_102134这个分片)。EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;返回结果如下:
QUERY PLAN ---------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 1 -- 任务数仅为1,表示查询已精确定位到单个分片 Tasks Shown: All -> Task Node: host=10.188.92.147 port=3003 dbname=testdb -> Seq Scan on t2_102134 t2 Filter: (a = 1000) (7 rows)
通过亲和组与复制表下推JOIN(多表关联优化)
在分布式环境中,避免跨节点JOIN是提升性能的关键。您可以通过以下两种方式实现JOIN计算下推:
使用亲和组:如果两张或多张大表使用相同的分布列进行JOIN,将它们设置为同一亲和组,可以保证关联数据位于同一节点,从而实现本地JOIN。
使用复制表:如果有一张参与JOIN的表符合以下条件时,可将表创建为复制表。复制表在每个数据节点上都存有副本,同样能实现本地JOIN。
由于某种原因不适合使用连接键作为分布列来建立分布表(例如,已经选择了其他的分布列来建立分布表)。
表的修改比较少,且需要被其他多张表JOIN。
原理:
当优化器发现JOIN的两张表是亲和的(或其中一张是复制表),并且JOIN条件是分布列时,它就能确定关联的数据行都位于同一个数据节点上。因此,优化器会将整个JOIN操作作为一个任务下推到每个数据节点。各个节点在本地完成JOIN运算,并进行并行执行。最后,协调节点(CN)只需收集并汇总各个DN返回的结果即可。这个过程避免了在节点间传输海量的原始数据来进行JOIN,从而实现了高性能的查询。
示例:
使用亲和组
创建
colocation_t1表和colocation_t2表,并均以a列为分布列,且它们是亲和的。CREATE TABLE colocation_t1 (a INT, b INT); SELECT create_distributed_table('colocation_t1', 'a'); CREATE TABLE colocation_t2 (a INT, b INT); SELECT create_distributed_table('colocation_t2', 'a');进行JOIN关联查询。从下述执行计划中可以看到JOIN操作被下推至数据节点(DN)中执行。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;返回结果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 -- 任务分发到所有分片 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join -- JOIN操作在数据节点(DN)内部执行 Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)子查询:使用亲和组的方式对子查询同样有有效。如果子查询的输出和分布表进行连接,且连接键是子查询内部的涉及到的分布表的分布列,那么也会使得子查询被下推。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;返回结果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)
使用复制表
创建
colocation_t3表和colocation_t4表。colocation_t3表以a列为分布列,colocation_t4表为复制表。CREATE TABLE colocation_t3 (a INT, b INT); SELECT create_distributed_table('colocation_t3', 'a'); CREATE TABLE colocation_t4 (a INT, b INT); SELECT create_reference_table('colocation_t4');进行JOIN关联查询。从下述执行计划中可以看到JOIN操作被下推至数据节点(DN)中执行。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;返回结果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t3.a = colocation_t4.b) -> Sort Sort Key: colocation_t3.a -> Seq Scan on colocation_t3_102145 colocation_t3 -> Sort Sort Key: colocation_t4.b -> Seq Scan on colocation_t4_102149 colocation_t4 (13 rows)
下推聚合与排序
对于聚合和排序操作,将计算下推到数据节点能显著降低协调节点(CN)的压力和网络流量。
聚合下推
当
GROUP BY的KEY包含分布列时,聚合函数(如SUM,COUNT)会完全下推到数据节点。当
GROUP BY的KEY不包含分布列时,优化器会采用两阶段策略,先在数据节点进行预聚合,然后在协调节点做最终聚合。第一阶段(DN本地聚合):每个数据节点先对自己持有的数据分片进行一次聚合运算,得出一个局部的中间结果。
第二阶段(CN最终聚合):协调节点收集所有DN发来的局部结果(这个结果集通常很小),然后执行第二次聚合,计算出最终的全局结果。这种方式,减少了需要通过网络传输到协调节点的数据量。
示例:
创建
group_t表,并以a列为分布列。CREATE TABLE group_t (a INT, b INT); SELECT create_distributed_table('group_t', 'a');包含分布列时,完全下推。聚合函数将完全在 DN 上执行,它的执行计划如下所示:
EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;返回结果如下:
QUERY PLAN --------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> HashAggregate Group Key: a -> Seq Scan on group_t_102150 group_t (8 rows)不包含分布列时,采用两阶段策略。会执行两次聚合函数,分别是在DN和CN上执行:
EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;返回结果如下:
QUERY PLAN --------------------------------------------------------------- Aggregate -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Aggregate -> Seq Scan on group_t_102150 group_t (8 rows)
排序下推
ORDER BY ... LIMIT N子句在很多场景下也能被下推到数据节点,每个节点先返回自己的Top N,协调节点再做最终排序,极大减少了需要排序的数据量。
例如:如果不存在GROUP BY子句,或者GROUP BY子句的KEY包含了分布列,那么ORDER BY ... LIMIT N会下推到分片上。对于每一个分片返回的结果,CN会再次执行ORDER BY ... LIMIT N的操作。
示例:
创建
order_t表,并以a列为分布列。CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');ORDER BY ... LIMIT N被下推,它的执行计划如下所示:EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;返回结果如下:
QUERY PLAN ------------------------------------------------------------------------ Limit -> Sort Sort Key: remote_scan.a -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Limit -> Sort Sort Key: a -> Seq Scan on order_t_102154 order_t (12 rows)
使用CTE改写复杂查询
对于无法通过亲和组优化的复杂JOIN(例如JOIN条件不是分布列),优化器默认可能选择一种低效的执行计划(如全量拉取一张表的数据到协调节点再下发)。此时,可以使用CTE(WITH子句)来手动优化。
原理:
先通过CTE对大表进行高选择性的过滤,生成一个小的中间结果集,然后再用这个小结果集去关联另一张表。这样可以大幅减少节点间需要传输的数据量。
示例:
数据准备:创建
cte_t1表和cte_t2表,并以a列为分布列。在cte_t1表的b列上建立本地索引。-- 在不亲和的表或列之间进行JOIN时,允许查询优化器生成数据重分区的执行计划。 SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON; -- 创建普通表 CREATE TABLE cte_t1(a INT, b INT); CREATE TABLE cte_t2(a INT, b INT); -- 转换分布表 SELECT create_distributed_table('cte_t1', 'a'); SELECT create_distributed_table('cte_t2', 'a'); -- 创建本地索引 CREATE INDEX local_idx_t1_b ON cte_t1(b); -- 插入测试数据 INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; -- 收集并更新这两张表的统计信息。 ANALYZE cte_t1, cte_t2;优化前:如果直接执行非亲和JOIN查询,会产生下面的执行计划,它会首先在
cte_t2上进行全表扫描,将结果返回到CN,然后再下发到DN上,进行JSON:EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;返回结果如下:
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1) -> Distributed Subplan 46_1 Subplan Duration: 1488.57 ms Intermediate Data Size: 17 MB Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1) Task Count: 4 Tuple data received from nodes: 7813 kB Tasks Shown: One of 4 -> Task Tuple data received from node: 1950 kB Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Seq Scan on cte_t2_102165 cte_t2 (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l oops=1) Planning Time: 0.013 ms Execution Time: 48.372 ms Planning Time: 0.000 ms Execution Time: 531.747 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 480 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Hash Join (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1) Hash Cond: (intermediate_result.b = cte_t1.a) -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179 .565..278.108 rows=1000000 loops=1) -> Hash (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.021 ..0.021 rows=3 loops=1) Index Cond: (b = 1) Planning Time: 0.365 ms Execution Time: 332.955 ms Planning Time: 0.625 ms Execution Time: 1823.139 ms (34 rows)优化后:通过CTE先过滤
t1表(强制对cte_t1进行了索引扫描),并将结果返回到CN,既减少了扫描时间,也减少了网络传输:EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;返回结果如下:优化后的查询速度会得到数十倍的提升,因为它避免了大规模的数据重分布。
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1) -> Distributed Subplan 49_1 Subplan Duration: 0.94 ms Intermediate Data Size: 180 bytes Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1) Task Count: 4 Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 32 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.005 ..0.005 rows=4 loops=1) Index Cond: (b = 1) Planning Time: 0.020 ms Execution Time: 0.014 ms Planning Time: 0.000 ms Execution Time: 0.779 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 496 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Gather (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1) Workers Planned: 1 Workers Launched: 1 -> Hash Join (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2) Hash Cond: (cte_t2.b = intermediate_result.a) -> Parallel Seq Scan on cte_t2_102163 cte_t2 (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows= 124936 loops=2) -> Hash (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..0.07 rows=10 width=8) (actual time =1.444..1.448 rows=10 loops=2) Planning Time: 0.068 ms Execution Time: 19.729 ms Planning Time: 0.514 ms Execution Time: 39.672 ms (37 rows)
SELECT ... FOR UPDATE限制
出于全局锁协调的复杂性,SELECT ... FOR UPDATE语句不支持跨分片执行。如果查询条件无法将范围限定在单个分片内,该语句将会报错。
SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR: FOR UPDATE is not supported for query of more than one share