本文主要介绍基于代价进行并行优化、并行执行的云数据库的并行查询引擎的关键问题和核心技术。

背景信息

随着数据规模的不断扩大,用户SQL的执行时间越来越长,这不仅对数据库的优化能力提出更高的要求,并且对数据库的执行模式也提出了新的挑战。随着数据库在云上的蓬勃发展,越来越多的传统用户迁移到云上,享受云上弹性扩展的红利,但是随着业务的快速扩张,却发现即使动态增加了很多资源,但SQL的执行时间还是越来越慢,并没有随着资源的投入达到预期的效果。显而易见,虽然新增了很多资源,但这些资源并没用被充分利用,很多传统的商业数据库,如Oracle、SQL Server等都提供对并行查询引擎的支持,以充分利用系统资源,达到加速SQL执行的效果。

如何将查询并行起来

对于一个类OLAP的查询,它通常是对大批量数据的查询,数据量大意味着数据远大于数据库的内存容量,大部分数据可能无法缓存到数据库的缓冲区中,而必须在查询执行时才动态加载到缓冲区中,这样就会造成大量IO操作,而IO操作又是最耗时的,因此首先要考虑的就是如何能加速IO操作。

由于硬件的限制,每次IO的耗时基本是固定的,虽然有顺序IO和随机IO的区别,但由于如今SSD已经普遍应用,两者的差异也在逐渐接近。那么还有没有其它方式可以加速IO呢? 显然并行IO是一个简单易行的方法,如果多个线程可以同时发起IO,每个线程只读取部分数据,这样就可以快速的将数据读到数据库的缓冲区中。并行查询

并行读取数据的示意图如上所示,每个worker代表一个线程,如果数据已经有partition分区,可以每个线程读取一个partition;也可以将全部数据按固定大小进行分片,例如按一个数据页面大小,然后每个线程以Round-robin模式轮询读取一个分片。

这里需要注意的是,按已有partition分配给不同worker可能会导致每个worker处理的数据不均匀,而按Round-robin模式进行轮询,如果分片设置的比较小,相对来说就比较容易做到每个worker处理的数据比较均匀。

如果只是将数据读取到缓冲区中,而不是立即进行后续处理,那么这些数据就会因缓冲区爆满导致数据被换出,从而失去加速IO的意义。因此,在并行读取数据的同时,必须同时并行的处理这些数据,这是并行查询加速的基础。

传统的优化器只能生成串行的执行计划,为了实现并行读取数据,同时并行处理数据,首先必须对现有的优化器进行改造,让优化器可以生成需要的并行计划。例如,选择哪个表或哪些表可以并行读取,并且通过并行读取会带来足够的收益;或者哪些操作可以并行执行,并且可以带来足够的收益。

并不是说并行化改造一定会有收益。例如,对一个数据量很小的表,可能只是几行,如果也对它进行并行读取的话,并行执行所需要的多线程构建再加上线程间的数据同步等所需要的代价可能远大于所得到的收益,总体来说,并行执行会需要更多的资源和时间,这就得不偿失了。因此查询计划的并行化必须是基于代价的,否则可能会导致更严重的性能退化问题。

如何选择并行扫描的表

选择并行扫描的表是生成并行计划的重要基础,通过基于并行扫描代价的计算和比较,选择可以并行扫描的表作为候选,是并行执行计划迭代的第一步。基于新的并行代价,也许会有更优的JOIN顺序选择,尤其是当参与JOIN的表的数量比较多时,这需要更多额外的迭代空间,为防止优化过程消耗太多的时间,保持原有计划的JOIN顺序是一个不错的选择。另外,对于参与JOIN的每张表,因为表的访问方法不同,例如全表扫描、Ref索引扫描、Range索引扫描等,这些都会影响到最终并行扫描的代价。

通常我们选择最大的那张表作为并行表,这样并行扫描的收益最大,当然也可以选择多个表同时做并行扫描,后面会继续讨论更复杂的情况。

以查询年度消费TOP 10的用户为例:
SELECT c.c_name, sum(o.o_totalprice) as s 
FROM customer c, orders o 
WHERE c.c_custkey = o.o_custkey 
      AND o_orderdate >= '1996-01-01' 
      AND o_orderdate <= '1996-12-31' 
GROUP BY c.c_name 
ORDER BY s DESC 
LIMIT 10;
说明 其中,orders表为订单表,数据很多,这类表也被称之为事实表;customer表为客户表,数据相对较少,这类表也被称之为维度表。
此SQL的并行执行计划如下图所示:执行计划从计划中可以看出orders表会进行并行扫描,由32个workers线程来执行,每个worker只扫描orders表的一部分数据分片,然后与customer表按o_custkey做index lookup进行JOIN,JOIN的结果发送到一个collector组件,然后由collector组件继续做后续的GROUP BY、ORDER BY及LIMIT操作。

选择多表并行的JOIN

并行扫描不仅支持一张表,也支持SQL中有2张或更多的FACT表进行并行扫描,示例如下:
SELECT o.o_custkey, sum(l.l_extendedprice) as s 
FROM orders o, lineitem l 
WHERE o.o_custkey = l.l_orderkey 
GROUP BY o.o_custkey 
ORDER BY s 
LIMIT 10;
说明 其中,orders表和lineitem表都是数据量很大的事实表。
此SQL的并行执行计划如下图所示:执行计划2从计划中可以看到orders表和lineitem表都会进行并行扫描,都由32个workers线程来执行。我们以2个表为例,当2个表执行JOIN时,通常的JOIN方式有Nested Loop JOIN、HASH JOIN等,对于不同的JOIN方式,为保证结果的正确性,必须选择合理的表扫描方式。
以HASH JOIN为例,对于串行执行的HASH JOIN来说,首先选择一个表创建HASH表称之为Build表,然后读取另一个Probe表,计算HASH,并在Build表中进行HASH匹配,若匹配成功,输出结果,否则继续读取。如果改为并行HASH JOIN,并行优化器会对串行执行的HASH JOIN进行并行化改造,使之成为并行HASH JOIN,并行化改造的方案可以有以下两种解决方案:
  • 方案一:是将2个表都按HASH key进行分区,相同HASH值的数据处于同一个分区内,由同一个线程执行HASH JOIN。
  • 方案二:是创建一个共享的Build表,由所有执行HASH JOIN的线程共享,然后每个线程并行读取属于自己线程的另外一个表的分片,再执行HASH JOIN。
最终选择哪种方案,通过代价估算来决定。hash
  • 对于方案一,需要读取表中的所有数据,根据选中的HASH key,对数据进行分区,并将数据发送到不同的处理线程中,这需要额外增加一个Repartition算子,负责根据分区规则将数据发送到不同的处理线程。
  • 对于方案二,需要并行创建共享的HASH build表,当build表创建成功后,每个线程读取Probe表的一个分片,分别执行HASH JOIN,这里的分片并不需要按照HASH key进行分片,每个线程分别读取互不相交的分片即可。

分析统计的复杂算子的并行

对于一个分析统计的需求,GROUP BY操作是不可避免的操作,尤其对大量的JOIN结果再做GROUP BY操作,是整个SQL中最费时的一个过程,因此GROUP BY的并行也是并行查询引擎必须优先解决的问题。

以年度消费TOP10客户的SQL为例,对GROUP BY并行化后的并行执行计划如下图所示:复杂算子并行与之前的执行计划相比,新的执行计划中多了一个collector组件,总共有2个collector组件。首先查看第二行的collector组件,它的extra信息中有2条Using temporary; Using filesort,这表示它是对从workers接收到的数据执行GROUP BY,然后再按ORDER排序,因为只有第一个collector组件在用户的session中,所以这个collector也是在worker中并行执行,也就是说并行的做Group by和Order by以及Limit;然后看第一行的collector组件,它的extra信息中只有一条Merge sort,表示session线程对从workers接收到的数据执行一次merge sort,然后将结果返回给用户。
由于explain计划显示的问题,在常规模式下不显示LIMIT操作,但在Tree模式下会显示LIMIT操作。如下所示:limit从Tree型计划树上可以清楚的看到LIMIT操作有2处,一处在计划的顶端,也就是在session上,进行完limit后将数据返回给用户;另外一处在计划树的中间位置,它其实是在worker线程的执行计划上,在每个worker线程中在排序完成后也会进行一次limit,这样就可以极大减少worker返回给session线程的数据量,从而提升整体性能。

通常来说,每个worker只有所有数据的一个分片,只在一个数据分片上做GROUP BY是有极大的风险得到错误的GROUP BY结果的,因为同一GROUP分组的数据可能不只是在本WORKER的数据分片上,也可能在其它WORKER的数据分片中,被其它WORKER所持有。但是如果我们可以保证同一GROUP分组的数据一定位于同一个数据分片,并且这个数据分片只被一个WORKER线程所持有,那么就可以保证GROUP BY结果的正确性。通过Tree型执行计划可以看到,在并行JOIN之后,将JOIN的结果按GROUP分组的KEY值:c.c_name进行Repartition操作,将相同分组的数据分发到相同的WORKER,从而保证每个WORKER拥有的数据分片互不交叉,保证GROUP BY结果的正确性。

因为每个WORKER的GROUP BY操作已经是最终结果,所以还可以将ORDER BY和LIMIT也下推到WORKER来执行,进一步提升了并行执行的效率。

并行查询引擎对TPCH的线性加速

下图是一个并行查询引擎对TPCH的加速效果,TPC-H中100%的SQL可以被加速,70%的SQL加速比超过8倍,总和加速近13倍,Q6和Q12加速甚至超过32倍。线性加速

总结

数据库是应用系统的核心,而优化器是数据库的核心,优化器的好坏几乎可以决定一个数据库产品的成败。开发一个全新的优化器,对任何团队都是一个巨大的挑战,技术的复杂度暂且不提,单单想做到产品的足够稳定就是一个非常难以克服的困难。因此即使传统商业数据库,也是在现有优化器的基础上不断改进,逐渐增加对并行的支持,最终成为一个成熟的并行优化器。对PolarDB也是如此,在设计和开发并行查询引擎时,我们充分利用现有优化器的技术积累和实现基础,不断改进,不断打磨,最终形成了一个持续迭代的技术方案,以保证新的优化器的稳定运行和技术革新。