本文介绍了PolarDB PostgreSQL版的并行DML功能。
前提条件
支持的PolarDB PostgreSQL版的版本如下:PostgreSQL 11(内核小版本1.1.17及以上)
show polar_version;
简介
INSERT
/ UPDATE
/ DELETE
)也可以做到并行读并行写的加速。其中:- 并行读:指借助多个只读节点上的多进程来加速DML中的查找操作。
- 并行写:指在一个PolarDB唯一的读写节点上利用多进程实现并行写入。
术语
- QC:Query Coordinator,发起PX并行查询的进程角色。
- PX Worker:参与PX跨节点并行查询的工作进程角色。
- DML:数据操作语句,包含
INSERT
/UPDATE
/DELETE
。 - Slice:指每个PX Worker负责执行的计划分片。
- RW/RO:读写节点/只读节点。
功能介绍
- Parallel Insert
为了加速
INSERT ... SELECT ...
这种既有读取又有写入的DML SQL,PolarDB PostgreSQL版使用Parallel Insert来提升性能。对于SELECT
子查询,PolarDB PostgreSQL版使用多个 PX Worker 并行加速查询;对于INSERT
的写入操作,由于PolarDB PostgreSQL版只有一个RW节点,则会在RW节点上启动多个执行写入的PX Worker进程,通过Motion算子来接收RO节点上读取的数据,实现加速并行写入。RO节点上的PX Worker只能执行只读操作,但是在RW节点上的PX Worker可以执行写入操作。Parallel Insert在读写数据量均衡的情况下,最高能提升3倍的性能。Parallel Insert已支持:- 普通表
- 分区表
- 强制有序
- 并行度动态调整
- Parallel Update
与Parallel Insert类似,针对
UPDATE ... SET ...
,PolarDB PostgreSQL版使用多个PX Worker来执行并行查询,实现加速筛选需要更新的行;同时,在RW节点上启动多个PX Worker进程来执行更新操作。在读写数据量均衡的情况下,最高能提升3倍的性能。Parallel Update不支持分区表,支持并行度动态调整。 - Parallel Delete
与Parallel Update基本相同,针对
DELETE FROM ...
,PolarDB PostgreSQL版通过多个PX Worker来执行并行查询,实现加速筛选需要删除的行;同时,在RW节点启动多个PX Worker来执行删除操作。Parallel Delete不支持分区表,支持并行度动态调整。
原理介绍
- Parallel InsertParallel Insert的总体框架如下所示:Parallel Insert的处理步骤如下:
- QC进程接收到
INSERT ... SELECT
。 - QC进程对SQL进行解析、重写,生成查询树,通过PX优化器生成计划树。
- 通过bitmap标志来指定每个PX Worker负责执行哪部分执行计划。
- 将完整的计划树分发到RO节点和RW节点,并创建PX Worker进程,不同的是PX Workers根据自己的ID在bitmap中查找自己负责执行的计划。
- RO节点上的PX Workers执行查询计划,从存储中并行读取各自负责的数据分片。
- RO节点上的PX Workers通过Motion算子将查询数据发送给RW节点上的PX Workers。
- RW节点上的PX Workers并行向存储写入数据。
说明 其中,步骤5、6、7是全流水线执行的。以最简单的并行DMLINSERT INTO t1 SELECT * FROM t2
为例。表t1
和t2
都是只有两列的表。QUERY PLAN ------------------------------------------------- Insert on public.t1 -> Result Output: t2.c1, t2.c2 -> PX Hash 6:6 (slice1; segments: 6) Output: t2.c1, t2.c2, (1) -> Partial Seq Scan on public.t2 Output: t2.c1, t2.c2, 1 Optimizer: PolarDB PX Optimizer (8 rows)
说明 在执行计划中,Partial Seq Scan
表示每个PX Workers并行读取的数据分片。PX Hash 6:6说明有6个负责读取的PX Workers和6个负责写入的PX Workers。Hash
表示负责读取的PX Worker所读取到的数据会hash重分布到RW节点中负责写入的PX Worker上。Parallel Insert也支持单个写Worker,多个读Worker的执行计划:QUERY PLAN ------------------------------------------------------- Insert on public.t1 -> Result Output: t2.c1, t2.c2 -> PX Coordinator 6:1 (slice1; segments: 6) Output: t2.c1, t2.c2 -> Partial Seq Scan on public.t2 Output: t2.c1, t2.c2 Optimizer: PolarDB PX Optimizer (8 rows)
说明 由于只有一个写Worker,所以计划中显示的是PX Coordinator 6:1,将RO节点上的数据汇聚到RW节点上。以数据流的方式展示Parallel Insert的执行过程如下图所示:执行过程如下:
- 每个负责读取的PX Worker执行一部分的顺序扫描操作,读取数据,进入到
RedistributeMotionRandom
,将读取到的每条数据重分布,发送给各个负责写入的PX Worker。 - 通过
SendMotion
向RW节点上的PX Worker发送数据,RO节点上的每个PX Worker会从所有RW节点上的PX Worker中选择一个进行数据重分布,重分布的策略有哈希分布和随机分布两种。 - RW节点上被选中的PX Worker通过
RecvMotion
来接收数据,然后将数据通过ModifyTable
算子写入存储。
- QC进程接收到
- Parallel Update
由于Parallel Update和Parallel Delete在SQL解析、重写的过程和Parallel Insert相同,下面只说明Parallel Update的执行计划和数据流动方式。
不带子查询的Parallel Update计划,示例如下:QUERY PLAN -------------------------------------------------------------------------------------------------------- Update (segment: 6) on public.t1 -> Result Output: t1_1.c1, t1_1.c2, (DMLAction), t1_1.ctid -> PX Hash 6:6 (slice1; segments: 6) Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), ('16397'::oid) -> Result Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), '16397'::oid -> Split Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, DMLAction -> Partial Seq Scan on public.t1 t1_1 Output: t1_1.c1, t1_1.c2, 3, t1_1.ctid, t1_1._px_worker_id Optimizer: PolarDB PX Optimizer (12 rows)
说明 从执行计划中可以看出,从RO节点读取数据到RW节点写入数据之前存在一个Split算子。算子中还包含了一个DMLAction
的标志,用于表示当前正在进行的DML操作类型(DML_INSERT
/DML_DELETE
)。Split算子用于将UPDATE
拆分为DELETE
和INSERT
两个阶段,表示需要删除哪些行、插入哪些行。对于带有子查询的Parallel Update计划,除写入计划分片之外加入了自查询的执行计划分片。示例如下:QUERY PLAN ------------------------------------------------------------------------------------------------------------ Update (segment: 6) on public.t1 -> Result Output: t1_1.c1, t1_1.c2, (DMLAction), t1_1.ctid -> PX Hash 6:6 (slice1; segments: 6) Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), ('16397'::oid) -> Result Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), '16397'::oid -> Split Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, DMLAction -> Partial Seq Scan on public.t1 t1_1 Output: t1_1.c1, t1_1.c2, int4((SubPlan 1)), t1_1.ctid, t1_1._px_worker_id SubPlan 1 -> Materialize Output: (count()) -> PX Broadcast 1:6 (slice2) Output: (count()) -> Aggregate Output: count() -> PX Coordinator 6:1 (slice3; segments: 6) -> Partial Seq Scan on public.t2 Optimizer: PolarDB PX Optimizer (21 rows)
Parallel Update处理数据流图如下图所示:- 对于不带子查询的情况,例如,
UPDATE t1 SET c1=3
,流程如下:- 每个负责写入的PX Worker并行查找要更新的行。
- 通过Split算子,拆分成
DELETE
和INSERT
操作。 - 执行
ExecDelete
和ExecInsert
。
- 带子查询的情况,例如,
UPDATE t1 SET c1=(SELECT COUNT(*) FROM t2)
,流程如下:- 每个负责读取的PX Worker从共享存储上并行读取自己负责的数据分片,然后通过
SendMotion
将自己读到的数据汇聚到QC进程。 - QC进程将数据(过滤条件)广播给RW节点上的各个负责写入的PX Worker。
- 各个负责写入的PX Worker分别扫描各自负责的数据分片,查找待更新的数据。
- 通过Split算子,拆分成
DELETE
和INSERT
操作。 - 执行
ExecDelete
和ExecInsert
。
- 每个负责读取的PX Worker从共享存储上并行读取自己负责的数据分片,然后通过
- 对于不带子查询的情况,例如,
- Parallel Delete不带子查询的Parallel Delete 计划,示例如下:
QUERY PLAN -------------------------------------------------------------------------------- Delete (segment: 6) on public.t1 -> Result Output: t1_1.c1, t1_1.c2, t1_1.ctid -> PX Hash 6:6 (slice1; segments: 6) Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0) -> Partial Seq Scan on public.t1 t1_1 Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0 Filter: (t1_1.c1 < 10) Optimizer: PolarDB PX Optimizer (9 rows)
带子查询的Parallel Delete 计划,示例如下:QUERY PLAN ----------------------------------------------------------------------------------- Delete (segment: 6) on public.t1 -> Result Output: t1_1.c1, t1_1.c2, t1_1.ctid -> PX Hash 6:6 (slice1; segments: 6) Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0) -> Hash Semi Join Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0 Hash Cond: (t1_1.c1 = t2.c1) -> Partial Seq Scan on public.t1 t1_1 Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id -> Hash Output: t2.c1 -> Full Seq Scan on public.t2 Output: t2.c1 Optimizer: PolarDB PX Optimizer (15 rows)
负责读写的PX Workers数量:QUERY PLAN ----------------------------------------------------------------------------------- Delete (segment: 10) on public.t1 -> Result Output: t1_1.c1, t1_1.c2, t1_1.ctid -> PX Hash 6:10 (slice1; segments: 6) Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0) -> Hash Semi Join Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0 Hash Cond: (t1_1.c1 = t2.c1) -> Partial Seq Scan on public.t1 t1_1 Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id -> Hash Output: t2.c1 -> Full Seq Scan on public.t2 Output: t2.c1 Optimizer: PolarDB PX Optimizer (15 rows)
从以上结果可以看到Parallel Delete的计划与Parallel Update类似,区别如下:- 由于Parallel Delete只执行删除操作,不执行插入操作,所以不需要Split算子。
- 顶层的DML算子由Update变为Delete算子。
Parallel Delete的数据流图如下所示:执行过程如下:
- 每个负责读取的PX Workers扫描属于自己的数据分片,找出要删除的行。
- 将待删除的行通过Motion算子传输给每个负责写入的PX Workers,并行执行Delete操作。
使用指南
- Parallel Insert
Parallel Insert默认关闭。
- 使用Parallel Insert前,需要开启PX。
SET polar_enable_px = ON;
- 开启Parallel Insert功能。
SET polar_px_enable_insert_select = ON;
- 开启Parallel Insert写入分区表,默认关闭。
SET polar_px_enable_insert_partition_table = ON;
- 写入并行度控制。取值为1~128。默认为6,表示RW节点上会启动6个PX Workers来执行写入操作。
SET polar_px_insert_dop_num = 6;
- 支持开启无表查询,默认关闭。
SET polar_px_enable_insert_from_tableless = ON;
说明 由于Parallel Insert无法保证写入顺序,提供以下开关强制保证写入结果有序。-- 默认打开,关闭后则不保证并行Insert结果有序 SET polar_px_enable_insert_order_sensitive = ON;
- 使用Parallel Insert前,需要开启PX。
- Parallel Update通过
polar_px_enable_update
参数开启Parallel Update功能,默认关闭。SET polar_px_enable_update = ON;
通过polar_px_update_dop_num
参数设置Parallel Update的写入并行度。默认为6,范围为 1~128。-- 启动6个PX Workers进行写入 SET polar_px_update_dop_num = 6;
- Parallel Delete通过
polar_px_enable_delete
参数开启Parallel Delete功能,默认关闭。SET polar_px_enable_delete = ON;
通过polar_px_delete_dop_num
参数设置Parallel Delete的写入并行度。默认为6,范围为 1~128。-- 启动6个PX Workers进行删除 SET polar_px_delete_dop_num = 6;
性能对比
- Parallel Insert在读写数据量相同的情况下,总数据量为75 GB时,Parallel Insert的性能表现如下图所示:当读数据量远大于写数据量的情况下,总数据量为75 GB时,写入数据量占读数据量的0.25%时,Parallel Insert的性能表现如下图所示:由以上两张图可知:
- 在读写数据量相同的情况下,Parallel Insert最高能提升3倍的性能。
- 读数据量越大,Parallel Insert性能提升幅度越大,最高能有4倍左右的提升。
- 提升写入并行度对性能提升不大,主要原因是PX Worker必须在RW上执行并行写入,数据库中的表扩展锁成为性能瓶颈。
- Parallel Update在读写数据量相同的情况下,总数据量为75 GB时,Parallel Update的性能表现如下图示:在读数据量远大于写数据量的情况下,读写数据比例为100:1时,Parallel Update的性能表现如下图示:由以上两张图可知:
- 当读写数据量相同的情况下,Parallel Update最高能提升3倍的性能。
- 读数据量越大,Parallel Update性能提升幅度越大,最高能到达10倍的提升。
- 提升写入并行度对性能提升不大,主要原因是PX Worker必须在RW上执行并行写入,数据库中的表扩展锁成为性能瓶颈。
- Parallel Delete
Parallel Delete的性能表现和结论与Parallel Update基本一致,此处不再赘述。