当前云原生数据仓库 AnalyticDB PostgreSQL 版对Recursive CTE支持较为有限。在分布式场景下,为确保计划及执行层结果的正确性,云原生数据仓库 AnalyticDB PostgreSQL 版对Recursive CTE执行过程中的WorkTableScan算子施加了限制:禁止出现Motion,且该算子必须位于JOIN
的最左侧。因此,当Recursive CTE中出现数据量较大的表且实例的计算节点数量较多时,其执行性能表现极为不佳。云原生数据仓库 AnalyticDB PostgreSQL 版对临时表的执行计划没有任何限制,因此建议您通过PL/SQL函数加临时表的方案改写Recursive CTE。在每次迭代计算后,改写后的方案都可给出较优的执行计划,从而提升查询性能,满足业务需求。
改写示例
测试数据
准备测试表和测试数据如下。
CREATE TABLE city(id varchar(4), pid varchar(4), name varchar(10), gdp int);
INSERT INTO city VALUES('33', NULL, '浙江省', 20134);
INSERT INTO city VALUES('3301', '33', '杭州市', 5112);
INSERT INTO city VALUES('3302', '33', '宁波市', 3992);
INSERT INTO city VALUES('3303', '33', '温州市', 2125);
INSERT INTO city VALUES('3304', '33', '嘉兴市', 1688);
INSERT INTO city VALUES('3306', '33', '绍兴市', 1852);
INSERT INTO city VALUES('3305', '33', '湖州市', 964);
INSERT INTO city VALUES('3307', '33', '金华市', 1445);
INSERT INTO city VALUES('3308', '33', '衢州市', 507);
INSERT INTO city VALUES('3309', '33', '舟山市', 491);
INSERT INTO city VALUES('3310', '33', '台州市', 1486);
INSERT INTO city VALUES('3311', '33', '丽水市', 472);
INSERT INTO city VALUES('32', NULL, '江苏省', 30862);
INSERT INTO city VALUES('3201', '32', '南京市', 4359);
INSERT INTO city VALUES('3202', '32', '无锡市', 3584);
INSERT INTO city VALUES('3203', '32', '徐州市', 2118);
INSERT INTO city VALUES('3204', '32', '常州市', 2269);
INSERT INTO city VALUES('3205', '32', '苏州市', 5548);
INSERT INTO city VALUES('3206', '32', '南通市', 2982);
INSERT INTO city VALUES('3207', '32', '连云港市', 976);
INSERT INTO city VALUES('3208', '32', '淮安市', 1257);
INSERT INTO city VALUES('3209', '32', '盐城市', 1796);
INSERT INTO city VALUES('3210', '32', '扬州市', 1868);
INSERT INTO city VALUES('3211', '32', '镇江市', 1372);
INSERT INTO city VALUES('3212', '32', '泰州市', 1752);
INSERT INTO city VALUES('3213', '32', '宿迁市', 981);
原始查询
通过Recursive CTE找出目标省份及其下属所有城市的GDP。
WITH RECURSIVE CTE AS
(
SELECT id, CAST(name AS varchar(100)), gdp FROM city WHERE name = '浙江省'
UNION ALL
SELECT son.id, CAST(parent.name || '>' || son.name AS varchar(100)), son.gdp AS name
FROM city son INNER JOIN CTE parent ON son.pid = parent.id
)
SELECT id, name, gdp FROM CTE ORDER BY gdp DESC;
执行计划
在以上原始查询SQL前加上EXPLAIN
语句即可查看执行计划,执行计划详情如下。
从执行计划可以看出,city表的数据被广播到所有计算节点上。当city表中的数据量较大时,执行性能将显著降低。此外,ORCA优化器不支持Recursive CTE,因此只能回退至planner优化器。
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=13568.80..13971.85 rows=28451 width=242)
Merge Key: city.gdp
-> Sort (cost=13568.80..13592.51 rows=9484 width=242)
Sort Key: city.gdp DESC
-> Recursive Union (cost=0.00..12942.36 rows=9484 width=242)
-> Seq Scan on city (cost=0.00..155.67 rows=10 width=242)
Filter: ((name)::text = '浙江省'::text)
-> Hash Join (cost=885.67..1259.70 rows=947 width=242)
Hash Cond: ((parent.id)::text = (son.pid)::text)
-> WorkTable Scan on CTE parent (cost=0.00..1.95 rows=97 width=238)
-> Hash (cost=520.67..520.67 rows=29200 width=82)
-> Broadcast Motion 3:3 (slice2; segments: 3) (cost=0.00..520.67 rows=29200 width=82)
-> Seq Scan on city son (cost=0.00..131.33 rows=9733 width=82)
Optimizer: Postgres-based planner
(14 rows)
查询结果
查询结果详情如下。
id name gdp
33 浙江省 20134
3301 浙江省>杭州市 5112
3302 浙江省>宁波市 3992
3303 浙江省>温州市 2125
3306 浙江省>绍兴市 1852
3304 浙江省>嘉兴市 1688
3310 浙江省>台州市 1486
3307 浙江省>金华市 1445
3305 浙江省>湖州市 964
3308 浙江省>衢州市 507
3309 浙江省>舟山市 491
3311 浙江省>丽水市 472
改写查询
UNION ALL场景下的改写
以下示例将为您展示在UNION ALL
场景下对Recursive CTE的改写方法。通过PL/SQL函数改写原始查询,改写详情如下。
-- 函数的参数为可变参数的值
CREATE OR REPLACE FUNCTION city_gdp(
target_name varchar(10)
) RETURNS TABLE(
id varchar(4),
name varchar(100),
gdp int
) AS $$
-- 函数执行过程中用到的中间变量
DECLARE prev_count INT := 0;
curr_count INT := 0;
curr_level INT := 1;
BEGIN
-- 创建临时表,表结构相较于Recursive CTE中的字段多了level字段
CREATE TEMP TABLE temp_result(
id varchar(4),
name varchar(100),
gdp int,
level int
) ON COMMIT DROP DISTRIBUTED BY(id);
-- 向临时表中写入Rescursive CTE的非recursive部分
INSERT INTO temp_result
SELECT
parent.id,
CAST(parent.name AS varchar(100)),
parent.gdp,
1 AS level
FROM city parent
WHERE parent.name = target_name;
-- 统计当前临时表的行数,用于后续终止循环
prev_count := (
SELECT COUNT(*) FROM temp_result
);
LOOP
-- 可选,analyze临时表temp_result是为了后续能出更优的执行计划
ANALYZE temp_result;
-- 将Recursive CTE的Recursive中的部分写入到临时表中,注意level需要+1且WHERE条件中需要过滤属于当前level的数据
INSERT INTO temp_result
SELECT
son.id,
CAST(parent.name || '>' || son.name AS varchar(100)),
son.gdp,
parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level;
-- 再次统计当前临时表的行数
curr_count := (
SELECT COUNT(*) FROM temp_result
);
-- 若临时表无数据新增,则退出循环
IF curr_count = prev_count THEN EXIT;
END IF;
-- 在下一次循环前更新prev_count和curr_level
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- SQL的主查询部分,将临时表整体作为Recursive CTE引用即可
RETURN QUERY
SELECT
CTE.id,
CTE.name,
CTE.gdp
FROM temp_result CTE
ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;
执行计划
查看函数执行过程中的计划可以发现,每次循环时,ORCA优化器会自动生成更优的执行计划。
-- 第一次循环
LOG: Insert on temp_result (cost=0.00..862.04 rows=1 width=24)
-> Result (cost=0.00..0.00 rows=0 width=0)
-> Redistribute Motion 3:3 (slice1; segments: 3) (cost=0.00..862.00 rows=1 width=28)
Hash Key: city.id
-> Hash Join (cost=0.00..862.00 rows=1 width=34)
Hash Cond: ((city.pid)::text = (temp_result_1.id)::text)
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..431.00 rows=1 width=28)
Hash Key: city.pid
-> Seq Scan on city (cost=0.00..431.00 rows=1 width=28)
-> Hash (cost=431.00..431.00 rows=1 width=17)
-> Seq Scan on temp_result temp_result_1 (cost=0.00..431.00 rows=1 width=17)
Filter: (level = 1)
Optimizer: GPORCA
-- 第二次循环
LOG: Insert on temp_result (cost=0.00..862.04 rows=1 width=24)
-> Result (cost=0.00..0.00 rows=0 width=0)
-> Redistribute Motion 3:3 (slice1; segments: 3) (cost=0.00..862.00 rows=1 width=28)
Hash Key: city.id
-> Hash Join (cost=0.00..862.00 rows=1 width=43)
Hash Cond: ((temp_result_1.id)::text = (city.pid)::text)
-> Seq Scan on temp_result temp_result_1 (cost=0.00..431.00 rows=5 width=27)
Filter: (level = 2)
-> Hash (cost=431.00..431.00 rows=1 width=28)
-> Redistribute Motion 3:3 (slice2; segments: 3) (cost=0.00..431.00 rows=1 width=28)
Hash Key: city.pid
-> Seq Scan on city (cost=0.00..431.00 rows=1 width=28)
Optimizer: GPORCA
由于未对city表执行ANALYZE
(ANALYZE
语法详情请参见ANALYZE用法),上述展示的执行计划并非最优,仅为表明每次循环可生成不同的执行计划。
查询结果
查询时传入合理的参数值,例如“浙江省”。
SELECT * FROM city_gdp('浙江省');
结果如下。
id name gdp
33 浙江省 20134
3301 浙江省>杭州市 5112
3302 浙江省>宁波市 3992
3303 浙江省>温州市 2125
3306 浙江省>绍兴市 1852
3304 浙江省>嘉兴市 1688
3310 浙江省>台州市 1486
3307 浙江省>金华市 1445
3305 浙江省>湖州市 964
3308 浙江省>衢州市 507
3309 浙江省>舟山市 491
3311 浙江省>丽水市 472
UNION场景下的改写
Recursive CTE使用UNION可以自动去重,避免无限递归。
基于上文的示例,我们写入两条重复数据,模拟一个需要去重的场景,以便在下文体验UNION场景的改写。
INSERT INTO city VALUES('1111', '1111', '虚拟市', 1000);
INSERT INTO city VALUES('1111', '1111', '虚拟市', 1000);
在这个模拟场景中,Recursive CTE使用UNION去除重复数据,示例如下。
WITH RECURSIVE CTE AS
(
SELECT id, gdp FROM city WHERE name = '虚拟市'
UNION
SELECT son.id, son.gdp AS name FROM city son INNER JOIN CTE parent ON son.pid = parent.id
)
SELECT id, gdp FROM CTE ORDER BY gdp DESC;
改写时,您可以对临时表创建唯一索引,在去重后通过INSERT INTO ON CONFLICT
方式写入数据。具体改写如下。
-- 函数的参数为可变参数的值
CREATE OR REPLACE FUNCTION city_gdp(
target_name varchar(10)
) RETURNS TABLE(
id varchar(4),
gdp int
) AS $$
-- 函数执行过程中用到的中间变量
DECLARE prev_count INT := 0;
curr_count INT := 0;
curr_level INT := 1;
BEGIN
-- 创建临时表,表结构相较于Recursive CTE里的字段多了个level字段
CREATE TEMP TABLE temp_result(
id varchar(4),
gdp int,
level int
) ON COMMIT DROP DISTRIBUTED BY(id);
-- UNION场景新增:创建唯一索引
CREATE UNIQUE INDEX ON temp_result(id, gdp);
-- 向临时表中插入Rescursive CTE的非Recursive部分
INSERT INTO temp_result
SELECT
parent.id,
parent.gdp,
1 AS level
FROM city parent
WHERE parent.name = target_name
-- UNION场景新增
GROUP BY 1,2;
-- 统计当前临时表的行数,用于后续终止循环
prev_count := (
SELECT COUNT(*) FROM temp_result
);
LOOP
-- 可选,ANALYZE临时表目的是为了后续能出更优的执行计划
ANALYZE temp_result;
-- 将Recursive CTE的Recursive中的部分写入到临时表中,注意level需要+1且WHERE条件中需要过滤属于当前level的数据
INSERT INTO temp_result
SELECT
son.id,
son.gdp,
parent.level + 1 AS level
FROM city son
INNER JOIN temp_result parent ON parent.id = son.pid
WHERE parent.level = curr_level
-- UNION场景新增
GROUP BY 1,2,3
ON CONFLICT DO NOTHING;
-- 再次统计当前临时表的行数
curr_count := (
SELECT COUNT(*) FROM temp_result
);
-- 若临时表无数据新增,则退出循环
IF curr_count = prev_count THEN EXIT;
END IF;
-- 在下一次循环前更新prev_count和curr_level
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
-- SQL的主查询部分,将临时表整体作为Recursive CTE引用即可
RETURN QUERY
SELECT
CTE.id,
CTE.gdp
FROM temp_result CTE ORDER BY gdp DESC;
END;
$$ LANGUAGE plpgsql;
改写完成后,通过SELECT * FROM city_gdppp('虚拟市')
查询到与改写前相同的结果。
性能对比
测试数据
准备测试表和测试数据如下。
CREATE TABLE test_table(
id varchar(100),
parent_id varchar(100),
float1 float,
float2 float,
varchar1 varchar(100),
varchar2 varchar(100)) DISTRIBUTED BY (id);
INSERT INTO test_table VALUES('1-CCCCCCCCCCCCCCCCCCCC', '', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB');
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', '1-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(2, 10000) i;
INSERT INTO test_table SELECT i || '-CCCCCCCCCCCCCCCCCCCC', 'test2-CCCCCCCCCCCCCCCCCCCC', 1.01, 1.01, 'AAAAAAAAAAAAAAAAAAAA', 'BBBBBBBBBBBBBBBBBBBB' FROM generate_series(10001, 5000000) i;
原始查询
改写前查询如下。
WITH RECURSIVE CTE AS (
SELECT
parent.id,
parent.parent_id,
CAST(parent.id AS varchar(4000)) id_seq,
parent.float1,
parent.float2,
parent.varchar1,
parent.varchar2
FROM test_table parent
WHERE parent.id IN ('1-CCCCCCCCCCCCCCCCCCCC')
UNION ALL
SELECT
son.id,
son.parent_id,
CAST(CTE.id_seq || '>' || son.id AS varchar(4000)) id_seq,
son.float1,
son.float2,
son.varchar1,
son.varchar2
FROM test_table son
INNER JOIN CTE ON CTE.id = son.parent_id
) SELECT * FROM CTE;
执行计划如下。
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 2:1 (slice1; segments: 2) (cost=0.00..4008018.27 rows=50000002 width=1404) (actual time=1698.654..1733.896 rows=10000 loops=1)
-> Recursive Union (cost=0.00..3258018.24 rows=25000001 width=628) (actual time=0.062..1694.406 rows=10000 loops=1)
-> Seq Scan on test_table parent (cost=0.00..42563.00 rows=1 width=628) (actual time=0.058..80.130 rows=1 loops=1)
Filter: ((id)::text = '1-CCCCCCCCCCCCCCCCCCCC'::text)
Rows Removed by Filter: 2499158
-> Hash Join (cost=194565.00..271545.52 rows=2500000 width=628) (actual time=792.825..806.534 rows=5000 loops=2)
Hash Cond: ((cte.id)::text = (son.parent_id)::text)
Extra Text: (seg1) Hash chain length 1666666.7 avg, 4990000 max, using 3 of 8388608 buckets.
-> WorkTable Scan on cte (cost=0.00..0.20 rows=10 width=734) (actual time=0.002..0.415 rows=5000 loops=2)
-> Hash (cost=111313.00..111313.00 rows=5000000 width=112) (actual time=1591.270..1591.270 rows=5000000 loops=1)
Buckets: 8388608 Batches: 1 Memory Usage: 0kB
-> Broadcast Motion 2:2 (slice2; segments: 2) (cost=0.00..111313.00 rows=5000000 width=112) (actual time=0.138..359.646 rows=5000000 loops=1)
-> Seq Scan on test_table son (cost=0.00..36313.00 rows=2500000 width=112) (actual time=0.081..179.119 rows=2500841 loops=1)
Optimizer: Postgres-based planner
Planning Time: 0.523 ms
(slice0) Executor memory: 581K bytes.
(slice1) Executor memory: 1224280K bytes avg x 2 workers, 1225892K bytes max (seg1). Work_mem: 1223292K bytes max.
(slice2) Executor memory: 672K bytes avg x 2 workers, 672K bytes max (seg0).
Memory used: 8388608kB
Query Id: 5832811209844029710
Execution Time: 1773.102 ms
(21 rows)
改写查询
改写前需要将执行内存调整到8 GB。
SET statement_mem to '8GB';
改写详情如下。
CREATE OR REPLACE FUNCTION rewrite_query(
parent_id_arr character varying []
) RETURNS TABLE(
id varchar(100),
parent_id varchar(100),
id_seq varchar(4000),
float1 float,
float2 float2,
varchar1 varchar(100),
varchar2 varchar(100)
) AS $$
DECLARE prev_count INT := 0;
curr_count INT := 0;
curr_level INT := 1;
BEGIN
CREATE TEMP TABLE temp_result(
id varchar(100),
parent_id varchar(100),
id_seq varchar(4000),
float1 float,
float2 float2,
varchar1 varchar(100),
varchar2 varchar(100),
level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
parent.id,
parent.parent_id,
CAST(parent.id AS varchar(4000)) id_seq,
parent.float1,
parent.float2,
parent.varchar1,
parent.varchar2,
1 AS level
FROM test_table parent
WHERE parent.id = ANY(parent_id_arr);
prev_count := (
SELECT COUNT(*) FROM temp_result
);
LOOP
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
son.id,
son.parent_id,
CAST(temp_result.id_seq || '>' || son.id AS varchar(4000)) id_seq,
son.float1,
son.float2,
son.varchar1,
son.varchar2,
temp_result.level + 1 AS level
FROM test_table son
INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
SELECT COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
CTE.id,
CTE.parent_id,
CTE.id_seq,
CTE.float1,
CTE.float2,
CTE.varchar1,
CTE.varchar2
FROM temp_result CTE;
END;
$$ LANGUAGE plpgsql;
从以下执行计划中看出,改写后执行时间约0.9s,执行性能提升约一倍。
explain analyze SELECT * FROM rewrite_query(array['1-CCCCCCCCCCCCCCCCCCCC']);
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Function Scan on rewrite_query (cost=0.25..10.25 rows=1000 width=170) (actual time=875.001..875.652 rows=10000 loops=1)
Optimizer: Postgres-based planner
Planning Time: 0.040 ms
(slice0) Executor memory: 2644K bytes. Work_mem: 2785K bytes max.
Memory used: 1048576kB
Query Id: 338320616919474816
Execution Time: 875.896 ms
(7 rows)
真实案例
以下案例来源于某真实客户的生产场景。该场景相较于测试场景,计算节点数量更多且数据量更大,经改写查询后,执行时间由186s提升到2s内。
原始查询
WITH RECURSIVE CTE AS (
SELECT
parent.id,
parent.parent_id,
CAST(parent.parent_id AS varchar(4000)) id_seq,
parent.float1,
parent.float2,
parent.varchar1,
parent.varchar2
FROM test_table parent
WHERE
parent.parent_id IN ('test_id1','test_id2')
UNION ALL
SELECT
son.id,
son.parent_id,
CAST(CTE.id_seq || '>' || son.parent_id AS varchar(4000)) id_seq,
son.float1,
son.float2,
son.varchar1,
son.varchar2
FROM test_table son
INNER JOIN CTE ON CTE.id = son.parent_id
)
SELECT
m.varchar1,
m.varchar3,
CTE.parent_id,
CTE.id,
split_part(CTE.id_seq, '>', 1) id_1,
CTE.float1,
SUM(CTE.float2) sum_float2
FROM CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
m.varchar1,
m.varchar3,
CTE.parent_id,
CTE.id,
id_1,
CTE.float1
ORDER BY 7 DESC;
改写查询
CREATE OR REPLACE FUNCTION rewrite_query(
parent_id_arr character varying []
) RETURNS TABLE(
varchar1 varchar(100),
varchar3 varchar(100),
id varchar(100),
parent_id varchar(100),
id_1 text,
float1 float,
sum_float2 float
) AS $$
DECLARE prev_count INT := 0;
curr_count INT := 0;
curr_level INT := 1;
BEGIN
CREATE TEMP TABLE temp_result(
id varchar(100),
parent_id varchar(100),
id_seq varchar(4000),
float1 float,
float2 float2,
varchar1 varchar(100),
varchar2 varchar(100),
level int
) ON COMMIT DROP DISTRIBUTED BY(id);
INSERT INTO temp_result
SELECT
parent.id,
parent.parent_id,
CAST(parent.parent_id AS varchar(4000)) id_seq,
parent.float1,
parent.float2,
parent.varchar1,
parent.varchar2,
1 AS level
FROM test_table parent
WHERE parent.parent_id = ANY(parent_id_arr);
prev_count := (
SELECT COUNT(*) FROM temp_result
);
LOOP
ANALYZE temp_result;
INSERT INTO temp_result
SELECT
son.id,
son.parent_id,
CAST(temp_result.id_seq || '>' || son.parent_id as varchar(4000)) id_seq,
son.float1,
son.float2,
son.varchar1,
son.varchar2,
temp_result.level + 1 AS level
FROM test_table son
INNER JOIN temp_result ON temp_result.id = son.parent_id
WHERE temp_result.level = curr_level;
curr_count := (
SELECT COUNT(*) FROM temp_result
);
IF curr_count = prev_count THEN EXIT;
END IF;
prev_count := curr_count;
curr_level := curr_level + 1;
END LOOP;
RETURN QUERY
SELECT
m.varchar1,
m.varchar3,
CTE.parent_id,
CTE.id,
split_part(CTE.id_seq, '>', 1) id_1,
CTE.float1,
SUM(CTE.float2) sum_float2
FROM temp_result CTE
INNER JOIN other_table m ON m.varchar1 = CTE.varchar1
GROUP BY
m.varchar1,
m.varchar3,
CTE.parent_id,
CTE.id,
id_1,
CTE.float1
ORDER BY 7 DESC;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM rewrite_query(ARRAY['test_id1', 'test_id2']);