Recursive CTE改写提升查询性能

当前云原生数据仓库 AnalyticDB PostgreSQL 版对Recursive CTE支持较为有限。在分布式场景下,为确保计划及执行层结果的正确性,云原生数据仓库 AnalyticDB PostgreSQL 版对Recursive CTE执行过程中的WorkTableScan算子施加了限制:禁止出现Motion,且该算子必须位于JOIN的最左侧。因此,当Recursive CTE中出现数据量较大的表且实例的计算节点数量较多时,其执行性能表现极为不佳。云原生数据仓库 AnalyticDB PostgreSQL 版对临时表的执行计划没有任何限制,因此建议您通过PL/SQL函数加临时表的方案改写Recursive CTE。在每次迭代计算后,改写后的方案都可给出较优的执行计划,从而提升查询性能,满足业务需求。

改写示例

测试数据

准备测试表和测试数据如下。

CREATE TABLE city(id varchar(4), pid varchar(4), name varchar(10), gdp int);
INSERT INTO city VALUES('33', NULL, '浙江省', 20134);
INSERT INTO city VALUES('3301', '33', '杭州市', 5112);
INSERT INTO city VALUES('3302', '33', '宁波市', 3992);
INSERT INTO city VALUES('3303', '33', '温州市', 2125);
INSERT INTO city VALUES('3304', '33', '嘉兴市', 1688);
INSERT INTO city VALUES('3306', '33', '绍兴市', 1852);
INSERT INTO city VALUES('3305', '33', '湖州市', 964);
INSERT INTO city VALUES('3307', '33', '金华市', 1445);
INSERT INTO city VALUES('3308', '33', '衢州市', 507);
INSERT INTO city VALUES('3309', '33', '舟山市', 491);
INSERT INTO city VALUES('3310', '33', '台州市', 1486);
INSERT INTO city VALUES('3311', '33', '丽水市', 472);
INSERT INTO city VALUES('32', NULL, '江苏省', 30862);
INSERT INTO city VALUES('3201', '32', '南京市', 4359);
INSERT INTO city VALUES('3202', '32', '无锡市', 3584);
INSERT INTO city VALUES('3203', '32', '徐州市', 2118);
INSERT INTO city VALUES('3204', '32', '常州市', 2269);
INSERT INTO city VALUES('3205', '32', '苏州市', 5548);
INSERT INTO city VALUES('3206', '32', '南通市', 2982);
INSERT INTO city VALUES('3207', '32', '连云港市', 976);
INSERT INTO city VALUES('3208', '32', '淮安市', 1257);
INSERT INTO city VALUES('3209', '32', '盐城市', 1796);
INSERT INTO city VALUES('3210', '32', '扬州市', 1868);
INSERT INTO city VALUES('3211', '32', '镇江市', 1372);
INSERT INTO city VALUES('3212', '32', '泰州市', 1752);
INSERT INTO city VALUES('3213', '32', '宿迁市', 981);

原始查询

通过Recursive CTE找出目标省份及其下属所有城市的GDP。

WITH RECURSIVE CTE AS 
(
    SELECT id, CAST(name AS varchar(100)), gdp FROM city WHERE name = '浙江省'
    UNION ALL
    SELECT son.id, CAST(parent.name || '>' || son.name AS varchar(100)), son.gdp AS name 
    FROM city son INNER JOIN CTE parent ON son.pid = parent.id
)
SELECT id, name, gdp FROM CTE ORDER BY gdp DESC;

执行计划

在以上原始查询SQL前加上EXPLAIN语句即可查看执行计划,执行计划详情如下。

从执行计划可以看出,city表的数据被广播到所有计算节点上。当city表中的数据量较大时,执行性能将显著降低。此外,ORCA优化器不支持Recursive CTE,因此只能回退至planner优化器。

                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=13568.80..13971.85 rows=28451 width=242)
   Merge Key: city.gdp
   ->  Sort  (cost=13568.80..13592.51 rows=9484 width=242)
         Sort Key: city.gdp DESC
         ->  Recursive Union  (cost=0.00..12942.36 rows=9484 width=242)
               ->  Seq Scan on city  (cost=0.00..155.67 rows=10 width=242)
                     Filter: ((name)::text = '浙江省'::text)
               ->  Hash Join  (cost=885.67..1259.70 rows=947 width=242)
                     Hash Cond: ((parent.id)::text = (son.pid)::text)
                     ->  WorkTable Scan on CTE parent  (cost=0.00..1.95 rows=97 width=238)
                     ->  Hash  (cost=520.67..520.67 rows=29200 width=82)
                           ->  Broadcast Motion 3:3  (slice2; segments: 3)  (cost=0.00..520.67 rows=29200 width=82)
                                 ->  Seq Scan on city son  (cost=0.00..131.33 rows=9733 width=82)
 Optimizer: Postgres-based planner
(14 rows)

查询结果

查询结果详情如下。

id	name	        gdp
33	浙江省	        20134
3301	浙江省>杭州市	5112
3302	浙江省>宁波市	3992
3303	浙江省>温州市	2125
3306	浙江省>绍兴市	1852
3304	浙江省>嘉兴市	1688
3310	浙江省>台州市	1486
3307	浙江省>金华市	1445
3305	浙江省>湖州市	964
3308	浙江省>衢州市	507
3309	浙江省>舟山市	491
3311	浙江省>丽水市	472

改写查询

UNION ALL场景下的改写

以下示例将为您展示在UNION ALL场景下对Recursive CTE的改写方法。通过PL/SQL函数改写原始查询,改写详情如下。

-- 函数的参数为可变参数的值
CREATE OR REPLACE FUNCTION city_gdp(
    target_name varchar(10)
) RETURNS TABLE(
    id varchar(4),
    name varchar(100),
    gdp int
) AS $$
-- 函数执行过程中用到的中间变量
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
-- 创建临时表,表结构相较于Recursive CTE中的字段多了level字段  
CREATE TEMP TABLE temp_result(
    id varchar(4),
    name varchar(100),
    gdp int,
    level int
) ON COMMIT DROP DISTRIBUTED BY(id); 
-- 向临时表中写入Rescursive CTE的非recursive部分
INSERT INTO temp_result
SELECT
    parent.id,
    CAST(parent.name AS varchar(100)),
    parent.gdp,
    1 AS level
FROM city parent
WHERE parent.name = target_name;
-- 统计当前临时表的行数,用于后续终止循环
prev_count := (
    SELECT COUNT(*) FROM temp_result
);
LOOP 
-- 可选,analyze临时表temp_result是为了后续能出更优的执行计划
ANALYZE temp_result;
-- 将Recursive CTE的Recursive中的部分写入到临时表中,注意level需要+1且WHERE条件中需要过滤属于当前level的数据
INSERT INTO temp_result
SELECT
    son.id,
    CAST(parent.name || '>' || son.name AS varchar(100)),
    son.gdp,
    parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level;
-- 再次统计当前临时表的行数
curr_count := (
    SELECT  COUNT(*) FROM temp_result
);
-- 若临时表无数据新增,则退出循环
IF curr_count = prev_count THEN EXIT;
END IF;
-- 在下一次循环前更新prev_count和curr_level
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- SQL的主查询部分,将临时表整体作为Recursive CTE引用即可
RETURN QUERY
SELECT
    CTE.id,
    CTE.name,
    CTE.gdp
FROM temp_result CTE
ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;

执行计划

查看函数执行过程中的计划可以发现,每次循环时,ORCA优化器会自动生成更优的执行计划。

-- 第一次循环
LOG:  Insert on temp_result  (cost=0.00..862.04 rows=1 width=24)
  ->  Result  (cost=0.00..0.00 rows=0 width=0)
        ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=28)
              Hash Key: city.id
              ->  Hash Join  (cost=0.00..862.00 rows=1 width=34)
                    Hash Cond: ((city.pid)::text = (temp_result_1.id)::text)
                    ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..431.00 rows=1 width=28)
                          Hash Key: city.pid
                          ->  Seq Scan on city  (cost=0.00..431.00 rows=1 width=28)
                    ->  Hash  (cost=431.00..431.00 rows=1 width=17)
                          ->  Seq Scan on temp_result temp_result_1  (cost=0.00..431.00 rows=1 width=17)
                                Filter: (level = 1)
Optimizer: GPORCA
-- 第二次循环
LOG:  Insert on temp_result  (cost=0.00..862.04 rows=1 width=24)
  ->  Result  (cost=0.00..0.00 rows=0 width=0)
        ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=28)
              Hash Key: city.id
              ->  Hash Join  (cost=0.00..862.00 rows=1 width=43)
                    Hash Cond: ((temp_result_1.id)::text = (city.pid)::text)
                    ->  Seq Scan on temp_result temp_result_1  (cost=0.00..431.00 rows=5 width=27)
                          Filter: (level = 2)
                    ->  Hash  (cost=431.00..431.00 rows=1 width=28)
                          ->  Redistribute Motion 3:3  (slice2; segments: 3)  (cost=0.00..431.00 rows=1 width=28)
                                Hash Key: city.pid
                                ->  Seq Scan on city  (cost=0.00..431.00 rows=1 width=28)
Optimizer: GPORCA
说明

由于未对city表执行ANALYZEANALYZE语法详情请参见ANALYZE用法),上述展示的执行计划并非最优,仅为表明每次循环可生成不同的执行计划。

查询结果

查询时传入合理的参数值,例如“浙江省”。

SELECT * FROM city_gdp('浙江省');

结果如下。

id	name	        gdp
33	浙江省	        20134
3301	浙江省>杭州市	5112
3302	浙江省>宁波市	3992
3303	浙江省>温州市	2125
3306	浙江省>绍兴市	1852
3304	浙江省>嘉兴市	1688
3310	浙江省>台州市	1486
3307	浙江省>金华市	1445
3305	浙江省>湖州市	964
3308	浙江省>衢州市	507
3309	浙江省>舟山市	491
3311	浙江省>丽水市	472

UNION场景下的改写

Recursive CTE使用UNION可以自动去重,避免无限递归。

基于上文的示例,我们写入两条重复数据,模拟一个需要去重的场景,以便在下文体验UNION场景的改写。

INSERT INTO city VALUES('1111', '1111', '虚拟市', 1000);
INSERT INTO city VALUES('1111', '1111', '虚拟市', 1000);

在这个模拟场景中,Recursive CTE使用UNION去除重复数据,示例如下。

WITH RECURSIVE CTE AS
(
    SELECT id,  gdp FROM city WHERE name = '虚拟市'
    UNION
    SELECT son.id, son.gdp AS name FROM city son INNER JOIN  CTE parent ON son.pid = parent.id
)
SELECT id,  gdp FROM CTE ORDER BY gdp DESC;

改写时,您可以对临时表创建唯一索引,在去重后通过INSERT INTO ON CONFLICT方式写入数据。具体改写如下。

-- 函数的参数为可变参数的值
CREATE OR REPLACE FUNCTION city_gdp(
    target_name varchar(10)
) RETURNS TABLE(
    id varchar(4),
    gdp int
) AS $$
-- 函数执行过程中用到的中间变量
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
-- 创建临时表,表结构相较于Recursive CTE里的字段多了个level字段  
CREATE TEMP TABLE temp_result(
    id varchar(4),
    gdp int,
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
-- UNION场景新增:创建唯一索引
CREATE UNIQUE INDEX ON temp_result(id, gdp);  
-- 向临时表中插入Rescursive CTE的非Recursive部分
INSERT INTO temp_result
SELECT
    parent.id,
    parent.gdp,
    1 AS level
FROM city parent
WHERE parent.name = target_name
-- UNION场景新增
GROUP BY 1,2;
-- 统计当前临时表的行数,用于后续终止循环
prev_count := (
    SELECT  COUNT(*) FROM temp_result
);
LOOP 
-- 可选,ANALYZE临时表目的是为了后续能出更优的执行计划
ANALYZE temp_result;
-- 将Recursive CTE的Recursive中的部分写入到临时表中,注意level需要+1且WHERE条件中需要过滤属于当前level的数据
INSERT INTO temp_result
SELECT
    son.id,
    son.gdp,
    parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level
-- UNION场景新增
GROUP BY 1,2,3
ON CONFLICT DO NOTHING;
-- 再次统计当前临时表的行数
curr_count := (
    SELECT COUNT(*) FROM temp_result
);
-- 若临时表无数据新增,则退出循环
IF curr_count = prev_count THEN EXIT;
END IF;
-- 在下一次循环前更新prev_count和curr_level
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- SQL的主查询部分,将临时表整体作为Recursive CTE引用即可
RETURN QUERY
SELECT
    CTE.id,
    CTE.gdp
FROM temp_result CTE ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;

改写完成后,通过SELECT * FROM city_gdppp('虚拟市')查询到与改写前相同的结果。

性能对比

测试数据

准备测试表和测试数据如下。

CREATE TABLE test_table(
        id varchar(100),
        parent_id varchar(100),
        float1 float,
        float2 float,
        varchar1 varchar(100),
        varchar2 varchar(100)) DISTRIBUTED BY (id);
INSERT INTO test_table VALUES('1-CCCCCCCCCCCCCCCCCCCC', '', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB');
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', '1-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(2, 10000) i;
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', 'test2-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(10001, 5000000) i;

原始查询

改写前查询如下。

WITH RECURSIVE CTE AS (
    SELECT
        parent.id,
        parent.parent_id,
        CAST(parent.id AS varchar(4000)) id_seq,
        parent.float1,
        parent.float2,
        parent.varchar1,
        parent.varchar2
    FROM test_table parent
    WHERE parent.id IN ('1-CCCCCCCCCCCCCCCCCCCC')
    UNION ALL
    SELECT
        son.id,
        son.parent_id,
        CAST(CTE.id_seq || '>' || son.id AS varchar(4000)) id_seq,
        son.float1,
        son.float2,
        son.varchar1,
        son.varchar2
    FROM test_table son
    INNER JOIN CTE ON CTE.id = son.parent_id
) SELECT * FROM CTE;

执行计划如下。

                                                                              QUERY PLAN                                                                              
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 2:1  (slice1; segments: 2)  (cost=0.00..4008018.27 rows=50000002 width=1404) (actual time=1698.654..1733.896 rows=10000 loops=1)
   ->  Recursive Union  (cost=0.00..3258018.24 rows=25000001 width=628) (actual time=0.062..1694.406 rows=10000 loops=1)
         ->  Seq Scan on test_table parent  (cost=0.00..42563.00 rows=1 width=628) (actual time=0.058..80.130 rows=1 loops=1)
               Filter: ((id)::text = '1-CCCCCCCCCCCCCCCCCCCC'::text)
               Rows Removed by Filter: 2499158
         ->  Hash Join  (cost=194565.00..271545.52 rows=2500000 width=628) (actual time=792.825..806.534 rows=5000 loops=2)
               Hash Cond: ((cte.id)::text = (son.parent_id)::text)
               Extra Text: (seg1)   Hash chain length 1666666.7 avg, 4990000 max, using 3 of 8388608 buckets.
               ->  WorkTable Scan on cte  (cost=0.00..0.20 rows=10 width=734) (actual time=0.002..0.415 rows=5000 loops=2)
               ->  Hash  (cost=111313.00..111313.00 rows=5000000 width=112) (actual time=1591.270..1591.270 rows=5000000 loops=1)
                     Buckets: 8388608  Batches: 1  Memory Usage: 0kB
                     ->  Broadcast Motion 2:2  (slice2; segments: 2)  (cost=0.00..111313.00 rows=5000000 width=112) (actual time=0.138..359.646 rows=5000000 loops=1)
                           ->  Seq Scan on test_table son  (cost=0.00..36313.00 rows=2500000 width=112) (actual time=0.081..179.119 rows=2500841 loops=1)
 Optimizer: Postgres-based planner
 Planning Time: 0.523 ms
   (slice0)    Executor memory: 581K bytes.
   (slice1)    Executor memory: 1224280K bytes avg x 2 workers, 1225892K bytes max (seg1).  Work_mem: 1223292K bytes max.
   (slice2)    Executor memory: 672K bytes avg x 2 workers, 672K bytes max (seg0).
 Memory used:  8388608kB
 Query Id: 5832811209844029710
 Execution Time: 1773.102 ms
(21 rows)

改写查询

改写前需要将执行内存调整到8 GB。

SET statement_mem to '8GB';

改写详情如下。

CREATE OR REPLACE FUNCTION rewrite_query(
    parent_id_arr character varying []
) RETURNS TABLE(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100)
) AS $$
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
CREATE TEMP TABLE temp_result(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100),
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
    parent.id,
    parent.parent_id,
    CAST(parent.id AS varchar(4000)) id_seq,
    parent.float1,
    parent.float2,
    parent.varchar1,
    parent.varchar2,
    1 AS level
FROM test_table parent
WHERE parent.id = ANY(parent_id_arr);
prev_count := (
    SELECT COUNT(*) FROM temp_result
);
LOOP 
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
    son.id,
    son.parent_id,
    CAST(temp_result.id_seq || '>' || son.id AS varchar(4000)) id_seq,
    son.float1,
    son.float2,
    son.varchar1,
    son.varchar2,
    temp_result.level + 1 AS level
FROM test_table son
    INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
    SELECT  COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
    CTE.id,
    CTE.parent_id,
    CTE.id_seq,
    CTE.float1,
    CTE.float2,
    CTE.varchar1,
    CTE.varchar2
FROM temp_result CTE;
END;
$$ LANGUAGE plpgsql;

从以下执行计划中看出,改写后执行时间约0.9s,执行性能提升约一倍。

explain analyze SELECT * FROM rewrite_query(array['1-CCCCCCCCCCCCCCCCCCCC']);
                                                        QUERY PLAN                                                        
--------------------------------------------------------------------------------------------------------------------------
 Function Scan on rewrite_query  (cost=0.25..10.25 rows=1000 width=170) (actual time=875.001..875.652 rows=10000 loops=1)
 Optimizer: Postgres-based planner
 Planning Time: 0.040 ms
   (slice0)    Executor memory: 2644K bytes.  Work_mem: 2785K bytes max.
 Memory used:  1048576kB
 Query Id: 338320616919474816
 Execution Time: 875.896 ms
(7 rows)

真实案例

以下案例来源于某真实客户的生产场景。该场景相较于测试场景,计算节点数量更多且数据量更大,经改写查询后,执行时间由186s提升到2s内。

原始查询

WITH RECURSIVE CTE AS (
    SELECT
        parent.id,
        parent.parent_id,
        CAST(parent.parent_id AS varchar(4000)) id_seq,
        parent.float1,
        parent.float2,
        parent.varchar1,
        parent.varchar2
    FROM test_table parent
    WHERE
        parent.parent_id IN ('test_id1','test_id2')
    UNION ALL
    SELECT
        son.id,
        son.parent_id,
        CAST(CTE.id_seq || '>' || son.parent_id AS varchar(4000)) id_seq,
        son.float1,
        son.float2,
        son.varchar1,
        son.varchar2
    FROM test_table son
    INNER JOIN CTE ON CTE.id = son.parent_id
)
SELECT
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    split_part(CTE.id_seq, '>', 1) id_1,
    CTE.float1,
    SUM(CTE.float2) sum_float2
FROM CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    id_1,
    CTE.float1
ORDER BY 7 DESC;

改写查询

CREATE OR REPLACE FUNCTION rewrite_query(
    parent_id_arr character varying []
) RETURNS TABLE(
    varchar1 varchar(100),
    varchar3 varchar(100),
    id varchar(100),
    parent_id varchar(100),
    id_1 text,
    float1 float,
    sum_float2 float
) AS $$
DECLARE prev_count INT := 0;
        curr_count INT := 0;
        curr_level INT := 1;
BEGIN 
CREATE TEMP TABLE temp_result(
    id varchar(100),
    parent_id varchar(100),
    id_seq varchar(4000),
    float1 float,
    float2 float2,
    varchar1 varchar(100),
    varchar2 varchar(100),
    level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
    parent.id,
    parent.parent_id,
    CAST(parent.parent_id AS varchar(4000)) id_seq,
    parent.float1,
    parent.float2,
    parent.varchar1,
    parent.varchar2,
    1 AS level
FROM test_table parent
WHERE parent.parent_id = ANY(parent_id_arr);
prev_count := (
    SELECT  COUNT(*)  FROM temp_result
);
LOOP 
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
    son.id,
    son.parent_id,
    CAST(temp_result.id_seq || '>' || son.parent_id as varchar(4000)) id_seq,
    son.float1,
    son.float2,
    son.varchar1,
    son.varchar2,
    temp_result.level + 1 AS level
FROM test_table son
INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
    SELECT COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    split_part(CTE.id_seq, '>', 1) id_1,
    CTE.float1,
    SUM(CTE.float2) sum_float2
FROM temp_result CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
    m.varchar1,
    m.varchar3,
    CTE.parent_id,
    CTE.id,
    id_1,
    CTE.float1
ORDER BY 7 DESC;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM rewrite_query(ARRAY['test_id1', 'test_id2']);