本文介绍了PolarDB PostgreSQL版的并行DML功能。

前提条件

支持的PolarDB PostgreSQL版的版本如下:

PostgreSQL 11(内核小版本1.1.17及以上)

说明 您可通过如下语句查看PolarDB PostgreSQL版的内核小版本的版本号:
show polar_version;

简介

PolarDB PostgreSQL版提供了一款强大的分析型查询引擎Parallel eXecution(简称:PX),通过利用集群中多个只读节点来提升查询性能。同时,PX针对DML(INSERT / UPDATE / DELETE)也可以做到并行读并行写的加速。其中:
  • 并行读:指借助多个只读节点上的多进程来加速DML中的查找操作。
  • 并行写:指在一个PolarDB唯一的读写节点上利用多进程实现并行写入。

术语

  • QC:Query Coordinator,发起PX并行查询的进程角色。
  • PX Worker:参与PX跨节点并行查询的工作进程角色。
  • DML:数据操作语句,包含 INSERT / UPDATE / DELETE
  • Slice:指每个PX Worker负责执行的计划分片。
  • RW/RO:读写节点/只读节点。

功能介绍

  • Parallel Insert

    为了加速INSERT ... SELECT ... 这种既有读取又有写入的DML SQL,PolarDB PostgreSQL版使用Parallel Insert来提升性能。对于SELECT子查询,PolarDB PostgreSQL版使用多个 PX Worker 并行加速查询;对于INSERT的写入操作,由于PolarDB PostgreSQL版只有一个RW节点,则会在RW节点上启动多个执行写入的PX Worker进程,通过Motion算子来接收RO节点上读取的数据,实现加速并行写入。

    RO节点上的PX Worker只能执行只读操作,但是在RW节点上的PX Worker可以执行写入操作。Parallel Insert在读写数据量均衡的情况下,最高能提升3倍的性能。Parallel Insert已支持:
    • 普通表
    • 分区表
    • 强制有序
    • 并行度动态调整
  • Parallel Update

    与Parallel Insert类似,针对UPDATE ... SET ...PolarDB PostgreSQL版使用多个PX Worker来执行并行查询,实现加速筛选需要更新的行;同时,在RW节点上启动多个PX Worker进程来执行更新操作。在读写数据量均衡的情况下,最高能提升3倍的性能。Parallel Update不支持分区表,支持并行度动态调整。

  • Parallel Delete

    与Parallel Update基本相同,针对DELETE FROM ...PolarDB PostgreSQL版通过多个PX Worker来执行并行查询,实现加速筛选需要删除的行;同时,在RW节点启动多个PX Worker来执行删除操作。Parallel Delete不支持分区表,支持并行度动态调整。

原理介绍

  • Parallel Insert
    Parallel Insert的总体框架如下所示:总框架图
    Parallel Insert的处理步骤如下:
    1. QC进程接收到INSERT ... SELECT
    2. QC进程对SQL进行解析、重写,生成查询树,通过PX优化器生成计划树。
    3. 通过bitmap标志来指定每个PX Worker负责执行哪部分执行计划。
    4. 将完整的计划树分发到RO节点和RW节点,并创建PX Worker进程,不同的是PX Workers根据自己的ID在bitmap中查找自己负责执行的计划。
    5. RO节点上的PX Workers执行查询计划,从存储中并行读取各自负责的数据分片。
    6. RO节点上的PX Workers通过Motion算子将查询数据发送给RW节点上的PX Workers。
    7. RW节点上的PX Workers并行向存储写入数据。
    说明 其中,步骤5、6、7是全流水线执行的。
    以最简单的并行DMLINSERT INTO t1 SELECT * FROM t2为例。表t1t2都是只有两列的表。
                       QUERY PLAN
    -------------------------------------------------
     Insert on public.t1
       ->  Result
             Output: t2.c1, t2.c2
             ->  PX Hash 6:6  (slice1; segments: 6)
                   Output: t2.c1, t2.c2, (1)
                   ->  Partial Seq Scan on public.t2
                         Output: t2.c1, t2.c2, 1
     Optimizer: PolarDB PX Optimizer
    (8 rows)
    说明 在执行计划中,Partial Seq Scan表示每个PX Workers并行读取的数据分片。PX Hash 6:6说明有6个负责读取的PX Workers和6个负责写入的PX Workers。Hash表示负责读取的PX Worker所读取到的数据会hash重分布到RW节点中负责写入的PX Worker上。
    Parallel Insert也支持单个写Worker,多个读Worker的执行计划:
                          QUERY PLAN
    -------------------------------------------------------
     Insert on public.t1
       ->  Result
             Output: t2.c1, t2.c2
             ->  PX Coordinator 6:1  (slice1; segments: 6)
                   Output: t2.c1, t2.c2
                   ->  Partial Seq Scan on public.t2
                         Output: t2.c1, t2.c2
     Optimizer: PolarDB PX Optimizer
    (8 rows)
    说明 由于只有一个写Worker,所以计划中显示的是PX Coordinator 6:1,将RO节点上的数据汇聚到RW节点上。
    以数据流的方式展示Parallel Insert的执行过程如下图所示:执行过程执行过程如下:
    1. 每个负责读取的PX Worker执行一部分的顺序扫描操作,读取数据,进入到RedistributeMotionRandom,将读取到的每条数据重分布,发送给各个负责写入的PX Worker。
    2. 通过SendMotion向RW节点上的PX Worker发送数据,RO节点上的每个PX Worker会从所有RW节点上的PX Worker中选择一个进行数据重分布,重分布的策略有哈希分布和随机分布两种。
    3. RW节点上被选中的PX Worker通过RecvMotion来接收数据,然后将数据通过ModifyTable算子写入存储。
  • Parallel Update

    由于Parallel Update和Parallel Delete在SQL解析、重写的过程和Parallel Insert相同,下面只说明Parallel Update的执行计划和数据流动方式。

    不带子查询的Parallel Update计划,示例如下:
                                                QUERY PLAN
    --------------------------------------------------------------------------------------------------------
     Update (segment: 6) on public.t1
       ->  Result
             Output: t1_1.c1, t1_1.c2, (DMLAction), t1_1.ctid
             ->  PX Hash 6:6  (slice1; segments: 6)
                   Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), ('16397'::oid)
                   ->  Result
                         Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), '16397'::oid
                         ->  Split
                               Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, DMLAction
                               ->  Partial Seq Scan on public.t1 t1_1
                                     Output: t1_1.c1, t1_1.c2, 3, t1_1.ctid, t1_1._px_worker_id
     Optimizer: PolarDB PX Optimizer
    (12 rows)
    说明 从执行计划中可以看出,从RO节点读取数据到RW节点写入数据之前存在一个Split算子。算子中还包含了一个DMLAction的标志,用于表示当前正在进行的DML操作类型(DML_INSERT / DML_DELETE)。Split算子用于将UPDATE拆分为DELETEINSERT两个阶段,表示需要删除哪些行、插入哪些行。
    对于带有子查询的Parallel Update计划,除写入计划分片之外加入了自查询的执行计划分片。示例如下:
                                                  QUERY PLAN
    ------------------------------------------------------------------------------------------------------------
     Update (segment: 6) on public.t1
       ->  Result
             Output: t1_1.c1, t1_1.c2, (DMLAction), t1_1.ctid
             ->  PX Hash 6:6  (slice1; segments: 6)
                   Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), ('16397'::oid)
                   ->  Result
                         Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (DMLAction), '16397'::oid
                         ->  Split
                               Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, DMLAction
                               ->  Partial Seq Scan on public.t1 t1_1
                                     Output: t1_1.c1, t1_1.c2, int4((SubPlan 1)), t1_1.ctid, t1_1._px_worker_id
                                     SubPlan 1
                                       ->  Materialize
                                             Output: (count())
                                             ->  PX Broadcast 1:6  (slice2)
                                                   Output: (count())
                                                   ->  Aggregate
                                                         Output: count()
                                                         ->  PX Coordinator 6:1  (slice3; segments: 6)
                                                               ->  Partial Seq Scan on public.t2
     Optimizer: PolarDB PX Optimizer
    (21 rows)
    Parallel Update处理数据流图如下图所示:数据流图
    • 对于不带子查询的情况,例如,UPDATE t1 SET c1=3,流程如下:
      1. 每个负责写入的PX Worker并行查找要更新的行。
      2. 通过Split算子,拆分成DELETEINSERT操作。
      3. 执行ExecDeleteExecInsert
    • 带子查询的情况,例如,UPDATE t1 SET c1=(SELECT COUNT(*) FROM t2),流程如下:
      1. 每个负责读取的PX Worker从共享存储上并行读取自己负责的数据分片,然后通过SendMotion将自己读到的数据汇聚到QC进程。
      2. QC进程将数据(过滤条件)广播给RW节点上的各个负责写入的PX Worker。
      3. 各个负责写入的PX Worker分别扫描各自负责的数据分片,查找待更新的数据。
      4. 通过Split算子,拆分成DELETEINSERT操作。
      5. 执行ExecDeleteExecInsert
  • Parallel Delete
    不带子查询的Parallel Delete 计划,示例如下:
                                      QUERY PLAN
    --------------------------------------------------------------------------------
     Delete (segment: 6) on public.t1
       ->  Result
             Output: t1_1.c1, t1_1.c2, t1_1.ctid
             ->  PX Hash 6:6  (slice1; segments: 6)
                   Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0)
                   ->  Partial Seq Scan on public.t1 t1_1
                         Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0
                         Filter: (t1_1.c1 < 10)
     Optimizer: PolarDB PX Optimizer
    (9 rows)
    带子查询的Parallel Delete 计划,示例如下:
                                        QUERY PLAN
    -----------------------------------------------------------------------------------
     Delete (segment: 6) on public.t1
       ->  Result
             Output: t1_1.c1, t1_1.c2, t1_1.ctid
             ->  PX Hash 6:6  (slice1; segments: 6)
                   Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0)
                   ->  Hash Semi Join
                         Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0
                         Hash Cond: (t1_1.c1 = t2.c1)
                         ->  Partial Seq Scan on public.t1 t1_1
                               Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id
                         ->  Hash
                               Output: t2.c1
                               ->  Full Seq Scan on public.t2
                                     Output: t2.c1
     Optimizer: PolarDB PX Optimizer
    (15 rows)
    负责读写的PX Workers数量:
                                        QUERY PLAN
    -----------------------------------------------------------------------------------
     Delete (segment: 10) on public.t1
       ->  Result
             Output: t1_1.c1, t1_1.c2, t1_1.ctid
             ->  PX Hash 6:10  (slice1; segments: 6)
                   Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, (0)
                   ->  Hash Semi Join
                         Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id, 0
                         Hash Cond: (t1_1.c1 = t2.c1)
                         ->  Partial Seq Scan on public.t1 t1_1
                               Output: t1_1.c1, t1_1.c2, t1_1.ctid, t1_1._px_worker_id
                         ->  Hash
                               Output: t2.c1
                               ->  Full Seq Scan on public.t2
                                     Output: t2.c1
     Optimizer: PolarDB PX Optimizer
    (15 rows)
    从以上结果可以看到Parallel Delete的计划与Parallel Update类似,区别如下:
    • 由于Parallel Delete只执行删除操作,不执行插入操作,所以不需要Split算子。
    • 顶层的DML算子由Update变为Delete算子。
    Parallel Delete的数据流图如下所示:delete执行过程如下:
    1. 每个负责读取的PX Workers扫描属于自己的数据分片,找出要删除的行。
    2. 将待删除的行通过Motion算子传输给每个负责写入的PX Workers,并行执行Delete操作。

使用指南

  • Parallel Insert

    Parallel Insert默认关闭。

    • 使用Parallel Insert前,需要开启PX。
      SET polar_enable_px = ON;
    • 开启Parallel Insert功能。
      SET polar_px_enable_insert_select = ON;
    • 开启Parallel Insert写入分区表,默认关闭。
      SET polar_px_enable_insert_partition_table = ON;
    • 写入并行度控制。取值为1~128。默认为6,表示RW节点上会启动6个PX Workers来执行写入操作。
      SET polar_px_insert_dop_num = 6;
    • 支持开启无表查询,默认关闭。
      SET polar_px_enable_insert_from_tableless = ON;
    说明 由于Parallel Insert无法保证写入顺序,提供以下开关强制保证写入结果有序。
    -- 默认打开,关闭后则不保证并行Insert结果有序
    SET polar_px_enable_insert_order_sensitive = ON;
  • Parallel Update
    通过polar_px_enable_update参数开启Parallel Update功能,默认关闭。
    SET polar_px_enable_update = ON;
    通过polar_px_update_dop_num参数设置Parallel Update的写入并行度。默认为6,范围为 1~128。
    -- 启动6个PX Workers进行写入
    SET polar_px_update_dop_num = 6;
  • Parallel Delete
    通过polar_px_enable_delete参数开启Parallel Delete功能,默认关闭。
    SET polar_px_enable_delete = ON;
    通过polar_px_delete_dop_num参数设置Parallel Delete的写入并行度。默认为6,范围为 1~128。
    -- 启动6个PX Workers进行删除
    SET polar_px_delete_dop_num = 6;

性能对比

  • Parallel Insert
    在读写数据量相同的情况下,总数据量为75 GB时,Parallel Insert的性能表现如下图所示:insert
    当读数据量远大于写数据量的情况下,总数据量为75 GB时,写入数据量占读数据量的0.25%时,Parallel Insert的性能表现如下图所示:insert
    由以上两张图可知:
    • 在读写数据量相同的情况下,Parallel Insert最高能提升3倍的性能。
    • 读数据量越大,Parallel Insert性能提升幅度越大,最高能有4倍左右的提升。
    • 提升写入并行度对性能提升不大,主要原因是PX Worker必须在RW上执行并行写入,数据库中的表扩展锁成为性能瓶颈。
  • Parallel Update
    在读写数据量相同的情况下,总数据量为75 GB时,Parallel Update的性能表现如下图示:update
    在读数据量远大于写数据量的情况下,读写数据比例为100:1时,Parallel Update的性能表现如下图示:update
    由以上两张图可知:
    • 当读写数据量相同的情况下,Parallel Update最高能提升3倍的性能。
    • 读数据量越大,Parallel Update性能提升幅度越大,最高能到达10倍的提升。
    • 提升写入并行度对性能提升不大,主要原因是PX Worker必须在RW上执行并行写入,数据库中的表扩展锁成为性能瓶颈。
  • Parallel Delete

    Parallel Delete的性能表现和结论与Parallel Update基本一致,此处不再赘述。