优化分布表查询

更新时间:
复制为 MD 格式

PolarDB PostgreSQL分布式版集群中,查询性能与SQL的写法密切相关。编写优良的SQL可以最大化地利用分布式架构的优势,将计算下推到数据节点(DN)并行执行,从而避免不必要的数据传输和协调开销。本文为您提供一套实用的查询优化方法。

优化方式

查询场景

优化方法

核心原则

单表点查或小范围扫描

WHERE子句中使用分布列进行等值或范围过滤。

分片剪枝:精确定位到存储数据的单个或少数分片,避免全部分片扫描。

多张大表关联(JOIN)

  • 使用相同的分布列,并放入亲和组。

  • 将小表改造为复制表。

计算下推:将JOIN操作下推到各数据节点本地执行,避免跨节点网络开销。

分组聚合(GROUP BY)

GROUP BY子句中包含分布列。

聚合下推:将大部分聚合运算在数据节点本地完成,仅在协调节点做最终汇总。

复杂查询或非亲和JOIN

使用**CTE(Common Table Expressions)**改写查询。

减少数据重分布:通过CTE先过滤出小结果集,再进行后续的分布式操作,降低网络传输量。

利用分布列进行分片剪枝(单表查询优化)

分片剪枝是分布式查询中最基础、最高效的优化手段。当查询条件中包含分布列的等值判断时,优化器能直接定位到数据所在的唯一分片,性能接近单机查询。

原理:

当您发起查询时,分布式优化器会检查WHERE子句。如果它发现条件作用于分布列(如t2.a = 1000),优化器会根据1000这个值计算出其哈希值,从而精确推断出数据存储在哪一个物理分片上。随后,它会剪掉所有其他无关的分片,只将查询任务发送给目标分片所在的数据节点。这个过程就是分片剪枝。

示例

  1. 创建t2表,并以a列为分布列。

    CREATE TABLE t2 (a INT, b INT);
    SELECT create_distributed_table('t2', 'a');
  2. 查询a = 1000的数据,点查会直接被定位到唯一的分片(在下面的例子中是t2_102134这个分片)。

    EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;

    返回结果如下:

                            QUERY PLAN                        
    ----------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 1 -- 任务数仅为1,表示查询已精确定位到单个分片
       Tasks Shown: All
       ->  Task
             Node: host=10.188.92.147 port=3003 dbname=testdb
             ->  Seq Scan on t2_102134 t2
                   Filter: (a = 1000)
    (7 rows)

通过亲和组与复制表下推JOIN(多表关联优化)

在分布式环境中,避免跨节点JOIN是提升性能的关键。您可以通过以下两种方式实现JOIN计算下推:

  • 使用亲和组:如果两张或多张大表使用相同的分布列进行JOIN,将它们设置为同一亲和组,可以保证关联数据位于同一节点,从而实现本地JOIN。

  • 使用复制表:如果有一张参与JOIN的表符合以下条件时,可将表创建为复制表。复制表在每个数据节点上都存有副本,同样能实现本地JOIN。

    1. 由于某种原因不适合使用连接键作为分布列来建立分布表(例如,已经选择了其他的分布列来建立分布表)。

    2. 表的修改比较少,且需要被其他多张表JOIN。

原理

当优化器发现JOIN的两张表是亲和的(或其中一张是复制表),并且JOIN条件是分布列时,它就能确定关联的数据行都位于同一个数据节点上。因此,优化器会将整个JOIN操作作为一个任务下推到每个数据节点。各个节点在本地完成JOIN运算,并进行并行执行。最后,协调节点(CN)只需收集并汇总各个DN返回的结果即可。这个过程避免了在节点间传输海量的原始数据来进行JOIN,从而实现了高性能的查询。

示例

使用亲和组

  1. 创建colocation_t1表和colocation_t2表,并均以a列为分布列,且它们是亲和的。

    CREATE TABLE colocation_t1 (a INT, b INT);
    SELECT create_distributed_table('colocation_t1', 'a');
    
    CREATE TABLE colocation_t2 (a INT, b INT);
    SELECT create_distributed_table('colocation_t2', 'a');
  2. 进行JOIN关联查询。从下述执行计划中可以看到JOIN操作被下推至数据节点(DN)中执行。

     EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;

    返回结果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4 -- 任务分发到所有分片
       Tasks Shown: One of 4 
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join -- JOIN操作在数据节点(DN)内部执行
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)
  3. 子查询:使用亲和组的方式对子查询同样有有效。如果子查询的输出和分布表进行连接,且连接键是子查询内部的涉及到的分布表的分布列,那么也会使得子查询被下推。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;

    返回结果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)

使用复制表

  1. 创建colocation_t3表和colocation_t4表。colocation_t3表以a列为分布列,colocation_t4表为复制表。

    CREATE TABLE colocation_t3 (a INT, b INT);
    SELECT create_distributed_table('colocation_t3', 'a');
    
    CREATE TABLE colocation_t4 (a INT, b INT);
    SELECT create_reference_table('colocation_t4');
  2. 进行JOIN关联查询。从下述执行计划中可以看到JOIN操作被下推至数据节点(DN)中执行。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;

    返回结果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t3.a = colocation_t4.b)
                   ->  Sort
                         Sort Key: colocation_t3.a
                         ->  Seq Scan on colocation_t3_102145 colocation_t3
                   ->  Sort
                         Sort Key: colocation_t4.b
                         ->  Seq Scan on colocation_t4_102149 colocation_t4
    (13 rows)

下推聚合与排序

对于聚合和排序操作,将计算下推到数据节点能显著降低协调节点(CN)的压力和网络流量。

聚合下推

  • GROUP BYKEY包含分布列时,聚合函数(如SUMCOUNT)会完全下推到数据节点。

  • GROUP BYKEY不包含分布列时,优化器会采用两阶段策略,先在数据节点进行预聚合,然后在协调节点做最终聚合。

    1. 第一阶段(DN本地聚合):每个数据节点先对自己持有的数据分片进行一次聚合运算,得出一个局部的中间结果。

    2. 第二阶段(CN最终聚合):协调节点收集所有DN发来的局部结果(这个结果集通常很小),然后执行第二次聚合,计算出最终的全局结果。这种方式,减少了需要通过网络传输到协调节点的数据量。

示例

  1. 创建group_t表,并以a列为分布列。

    CREATE TABLE group_t (a INT, b INT);
    SELECT create_distributed_table('group_t', 'a');
  2. 包含分布列时,完全下推。聚合函数将完全在 DN 上执行,它的执行计划如下所示:

    EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;

    返回结果如下:

                           QUERY PLAN                        
    ---------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  HashAggregate
                   Group Key: a
                   ->  Seq Scan on group_t_102150 group_t
    (8 rows)
  3. 不包含分布列时,采用两阶段策略。会执行两次聚合函数,分别是在DNCN上执行:

    EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;

    返回结果如下:

                              QUERY PLAN                           
    ---------------------------------------------------------------
     Aggregate
       ->  Custom Scan (PolarCluster Adaptive)
             Task Count: 4
             Tasks Shown: One of 4
             ->  Task
                   Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                   ->  Aggregate
                         ->  Seq Scan on group_t_102150 group_t
    (8 rows)

排序下推

ORDER BY ... LIMIT N子句在很多场景下也能被下推到数据节点,每个节点先返回自己的Top N,协调节点再做最终排序,极大减少了需要排序的数据量。

例如:如果不存在GROUP BY子句,或者GROUP BY子句的KEY包含了分布列,那么ORDER BY ... LIMIT N会下推到分片上。对于每一个分片返回的结果,CN会再次执行ORDER BY ... LIMIT N的操作。

示例

  1. 创建order_t表,并以a列为分布列。

    CREATE TABLE order_t (a INT, b INT);
    SELECT create_distributed_table('order_t', 'a');
  2. ORDER BY ... LIMIT N被下推,它的执行计划如下所示:

    EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;

    返回结果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: remote_scan.a
             ->  Custom Scan (PolarCluster Adaptive)
                   Task Count: 4
                   Tasks Shown: One of 4
                   ->  Task
                         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                         ->  Limit
                               ->  Sort
                                     Sort Key: a
                                     ->  Seq Scan on order_t_102154 order_t
    (12 rows)

使用CTE改写复杂查询

对于无法通过亲和组优化的复杂JOIN(例如JOIN条件不是分布列),优化器默认可能选择一种低效的执行计划(如全量拉取一张表的数据到协调节点再下发)。此时,可以使用CTE(WITH子句)来手动优化。

原理

先通过CTE对大表进行高选择性的过滤,生成一个小的中间结果集,然后再用这个小结果集去关联另一张表。这样可以大幅减少节点间需要传输的数据量。

示例

  1. 数据准备:创建cte_t1表和cte_t2表,并以a列为分布列。在cte_t1表的b列上建立本地索引。

    -- 在不亲和的表或列之间进行JOIN时,允许查询优化器生成数据重分区的执行计划。
    SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;
    -- 创建普通表
    CREATE TABLE cte_t1(a INT, b INT);
    CREATE TABLE cte_t2(a INT, b INT);
    -- 转换分布表
    SELECT create_distributed_table('cte_t1', 'a');
    SELECT create_distributed_table('cte_t2', 'a');
    -- 创建本地索引
    CREATE INDEX local_idx_t1_b ON cte_t1(b);
    -- 插入测试数据
    INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    -- 收集并更新这两张表的统计信息。
    ANALYZE cte_t1, cte_t2;
  2. 优化前:如果直接执行非亲和JOIN查询,会产生下面的执行计划,它会首先在cte_t2上进行全表扫描,将结果返回到CN,然后再下发到DN上,进行JSON:

    EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;

    返回结果如下:

                                                                                       QUERY PLAN                                               
                                        
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1)
       ->  Distributed Subplan 46_1
             Subplan Duration: 1488.57 ms
             Intermediate Data Size: 17 MB
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 7813 kB
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 1950 kB
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Seq Scan on cte_t2_102165 cte_t2  (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l
    oops=1)
                             Planning Time: 0.013 ms
                             Execution Time: 48.372 ms
             Planning Time: 0.000 ms
             Execution Time: 531.747 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 480 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Hash Join  (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1)
                   Hash Cond: (intermediate_result.b = cte_t1.a)
                   ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179
    .565..278.108 rows=1000000 loops=1)
                   ->  Hash  (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1)
                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                         ->  Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.021
    ..0.021 rows=3 loops=1)
                               Index Cond: (b = 1)
                 Planning Time: 0.365 ms
                 Execution Time: 332.955 ms
     Planning Time: 0.625 ms
     Execution Time: 1823.139 ms
    (34 rows)
  3. 优化后:通过CTE先过滤t1表(强制对cte_t1进行了索引扫描),并将结果返回到CN,既减少了扫描时间,也减少了网络传输:

    EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;

    返回结果如下:优化后的查询速度会得到数十倍的提升,因为它避免了大规模的数据重分布。

                                                                                    QUERY PLAN                                                  
                                   
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1)
       ->  Distributed Subplan 49_1
             Subplan Duration: 0.94 ms
             Intermediate Data Size: 180 bytes
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 80 bytes
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 32 bytes
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.005
    ..0.005 rows=4 loops=1)
                               Index Cond: (b = 1)
                             Planning Time: 0.020 ms
                             Execution Time: 0.014 ms
             Planning Time: 0.000 ms
             Execution Time: 0.779 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 496 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Gather  (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1)
                   Workers Planned: 1
                   Workers Launched: 1
                   ->  Hash Join  (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2)
                         Hash Cond: (cte_t2.b = intermediate_result.a)
                         ->  Parallel Seq Scan on cte_t2_102163 cte_t2  (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows=
    124936 loops=2)
                         ->  Hash  (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2)
                               Buckets: 1024  Batches: 1  Memory Usage: 9kB
                               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..0.07 rows=10 width=8) (actual time
    =1.444..1.448 rows=10 loops=2)
                 Planning Time: 0.068 ms
                 Execution Time: 19.729 ms
     Planning Time: 0.514 ms
     Execution Time: 39.672 ms
    (37 rows)

SELECT ... FOR UPDATE限制

出于全局锁协调的复杂性,SELECT ... FOR UPDATE语句不支持跨分片执行。如果查询条件无法将范围限定在单个分片内,该语句将会报错。

SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR:  FOR UPDATE is not supported for query of more than one share