本文主要介绍基于代价进行并行优化、并行执行的云数据库的并行查询引擎的关键问题和核心技术。
背景信息
随着数据规模的不断扩大,用户SQL的执行时间越来越长,这不仅对数据库的优化能力提出更高的要求,并且对数据库的执行模式也提出了新的挑战。随着数据库在云上的蓬勃发展,越来越多的传统用户迁移到云上,享受云上弹性扩展的红利,但是随着业务的快速扩张,却发现即使动态增加了很多资源,但SQL的执行时间还是越来越慢,并没有随着资源的投入达到预期的效果。显而易见,虽然新增了很多资源,但这些资源并没用被充分利用,很多传统的商业数据库,如Oracle、SQL Server等都提供对并行查询引擎的支持,以充分利用系统资源,达到加速SQL执行的效果。
如何将查询并行起来
对于一个类OLAP的查询,它通常是对大批量数据的查询,数据量大意味着数据远大于数据库的内存容量,大部分数据可能无法缓存到数据库的缓冲区中,而必须在查询执行时才动态加载到缓冲区中,这样就会造成大量IO操作,而IO操作又是最耗时的,因此首先要考虑的就是如何能加速IO操作。
并行读取数据的示意图如上所示,每个worker代表一个线程,如果数据已经有partition分区,可以每个线程读取一个partition;也可以将全部数据按固定大小进行分片,例如按一个数据页面大小,然后每个线程以Round-robin模式轮询读取一个分片。
这里需要注意的是,按已有partition分配给不同worker可能会导致每个worker处理的数据不均匀,而按Round-robin模式进行轮询,如果分片设置的比较小,相对来说就比较容易做到每个worker处理的数据比较均匀。
如果只是将数据读取到缓冲区中,而不是立即进行后续处理,那么这些数据就会因缓冲区爆满导致数据被换出,从而失去加速IO的意义。因此,在并行读取数据的同时,必须同时并行的处理这些数据,这是并行查询加速的基础。
传统的优化器只能生成串行的执行计划,为了实现并行读取数据,同时并行处理数据,首先必须对现有的优化器进行改造,让优化器可以生成需要的并行计划。例如,选择哪个表或哪些表可以并行读取,并且通过并行读取会带来足够的收益;或者哪些操作可以并行执行,并且可以带来足够的收益。
并不是说并行化改造一定会有收益。例如,对一个数据量很小的表,可能只是几行,如果也对它进行并行读取的话,并行执行所需要的多线程构建再加上线程间的数据同步等所需要的代价可能远大于所得到的收益,总体来说,并行执行会需要更多的资源和时间,这就得不偿失了。因此查询计划的并行化必须是基于代价的,否则可能会导致更严重的性能退化问题。
如何选择并行扫描的表
选择并行扫描的表是生成并行计划的重要基础,通过基于并行扫描代价的计算和比较,选择可以并行扫描的表作为候选,是并行执行计划迭代的第一步。基于新的并行代价,也许会有更优的JOIN顺序选择,尤其是当参与JOIN的表的数量比较多时,这需要更多额外的迭代空间,为防止优化过程消耗太多的时间,保持原有计划的JOIN顺序是一个不错的选择。另外,对于参与JOIN的每张表,因为表的访问方法不同,例如全表扫描、Ref索引扫描、Range索引扫描等,这些都会影响到最终并行扫描的代价。
通常我们选择最大的那张表作为并行表,这样并行扫描的收益最大,当然也可以选择多个表同时做并行扫描,后面会继续讨论更复杂的情况。
SELECT c.c_name, sum(o.o_totalprice) as s
FROM customer c, orders o
WHERE c.c_custkey = o.o_custkey
AND o_orderdate >= '1996-01-01'
AND o_orderdate <= '1996-12-31'
GROUP BY c.c_name
ORDER BY s DESC
LIMIT 10;
选择多表并行的JOIN
SELECT o.o_custkey, sum(l.l_extendedprice) as s
FROM orders o, lineitem l
WHERE o.o_custkey = l.l_orderkey
GROUP BY o.o_custkey
ORDER BY s
LIMIT 10;
- 方案一:是将2个表都按HASH key进行分区,相同HASH值的数据处于同一个分区内,由同一个线程执行HASH JOIN。
- 方案二:是创建一个共享的Build表,由所有执行HASH JOIN的线程共享,然后每个线程并行读取属于自己线程的另外一个表的分片,再执行HASH JOIN。
- 对于方案一,需要读取表中的所有数据,根据选中的HASH key,对数据进行分区,并将数据发送到不同的处理线程中,这需要额外增加一个Repartition算子,负责根据分区规则将数据发送到不同的处理线程。
- 对于方案二,需要并行创建共享的HASH build表,当build表创建成功后,每个线程读取Probe表的一个分片,分别执行HASH JOIN,这里的分片并不需要按照HASH key进行分片,每个线程分别读取互不相交的分片即可。
分析统计的复杂算子的并行
对于一个分析统计的需求,GROUP BY操作是不可避免的操作,尤其对大量的JOIN结果再做GROUP BY操作,是整个SQL中最费时的一个过程,因此GROUP BY的并行也是并行查询引擎必须优先解决的问题。
Using temporary; Using filesort
,这表示它是对从workers接收到的数据执行GROUP BY,然后再按ORDER排序,因为只有第一个collector组件在用户的session中,所以这个collector也是在worker中并行执行,也就是说并行的做Group
by和Order by以及Limit;然后看第一行的collector组件,它的extra信息中只有一条Merge sort
,表示session线程对从workers接收到的数据执行一次merge sort,然后将结果返回给用户。
通常来说,每个worker只有所有数据的一个分片,只在一个数据分片上做GROUP BY是有极大的风险得到错误的GROUP BY结果的,因为同一GROUP分组的数据可能不只是在本WORKER的数据分片上,也可能在其它WORKER的数据分片中,被其它WORKER所持有。但是如果我们可以保证同一GROUP分组的数据一定位于同一个数据分片,并且这个数据分片只被一个WORKER线程所持有,那么就可以保证GROUP BY结果的正确性。通过Tree型执行计划可以看到,在并行JOIN之后,将JOIN的结果按GROUP分组的KEY值:c.c_name进行Repartition操作,将相同分组的数据分发到相同的WORKER,从而保证每个WORKER拥有的数据分片互不交叉,保证GROUP BY结果的正确性。
因为每个WORKER的GROUP BY操作已经是最终结果,所以还可以将ORDER BY和LIMIT也下推到WORKER来执行,进一步提升了并行执行的效率。
并行查询引擎对TPCH的线性加速
总结
数据库是应用系统的核心,而优化器是数据库的核心,优化器的好坏几乎可以决定一个数据库产品的成败。开发一个全新的优化器,对任何团队都是一个巨大的挑战,技术的复杂度暂且不提,单单想做到产品的足够稳定就是一个非常难以克服的困难。因此即使传统商业数据库,也是在现有优化器的基础上不断改进,逐渐增加对并行的支持,最终成为一个成熟的并行优化器。对PolarDB也是如此,在设计和开发并行查询引擎时,我们充分利用现有优化器的技术积累和实现基础,不断改进,不断打磨,最终形成了一个持续迭代的技术方案,以保证新的优化器的稳定运行和技术革新。