PolarDB IMCI执行器默认用行号表示执行的中间结果,当大查询所需数据量无法完全存放于内存时则可能会引发大量随机且重复IO,从而影响执行效率。为了解决上述问题,IMCI执行器实现了基于中间结果物化的算子集合,本文介绍了HashJoin算子的物化版本HashMatch的实现细节。
设计方案
HashMatch实现主要分为build与probe两个阶段,其中build阶段将左表每一行按join谓词作为key构建出散列表,而probe阶段则遍历右表每一行并根据其对应的join谓词查找散列表,最终针对不同join类型依匹配结果输出对应结果集。
对于build阶段,可以将左表所有数据构建出一个散列表,但只用一个散列表存放全部数据会导致该散列表比较大,在构建过程中可能存在相对严重冲突和不断扩容。为避免此问题,build阶段可以将左表数据按一定规则进行分区,每一个分区各自构建独立散列表,而probe阶段则根据右表每一行所在分区查找对应分区上的散列表进行相应处理。
Build阶段
在IMCI中HashMatch的build功能是在DoOpen中完成,实际分为DoBuild与DoMerge两阶段,每一阶段均采用线程组并发处理。
DoBuild
DoBuild阶段线程组Workers各自向左表取数据,并按照数据分区Partition来构建每一分区的独立散列表:
Worker\Partition | Partition0 | Partition1 | ... | PartitionN |
Worker0 | HashMap00 | HashMap01 | ... | HashMap0N |
Worker1 | HashMap10 | HashMap11 | ... | HashMap1N |
... | ... | ... | ... | ... |
WorkerM | HashMapM0 | HashMapM1 | ... | HashMapMN |
即每一个Worker在每一个Partition均构建一个散列表HashMap。其实除HashMap外,还保存着一组chunk对象,其保存物化后真正结果,而HashMap的uint64类型value只标记当前key所对应chunk位置,其中uint64按位分拆为uint16/uint16/uint32三部分,分别表示所属Worker/chunk内偏移/chunk数组索引等。 每一Worker并行从左表中取到元组,并按分区规则将该元组无须加锁直接插入到该Worker和Partition所对应HashMap中,不断重复该build步骤直到所有Worker取完左表为止。
DoMerge
DoBuild阶段完成后,每一Worker在每一个Partition均构建出一张散列表HashMap。 Build阶段构建出散列表主要用于Probe阶段进行查找判断是否匹配,既然Build阶段数据是按分区构建,那Probe阶段也需要根据分区规则到指定分区的散列表中查找。 而目前DoBuild构建出来的每一个分区均有Worker个散列表,当然Probe时可以依次查找该Partition的所有Worker散列表,但为了后期Probe阶段的便利性和查找性能,HashMatch在DoBuild后进行DoMerge,即将每一Partition上所有Worker散列表合成一个散列表。
Build\Partition | Partition0 | Partition1 | ... | PartitionN |
Merge | HashMap0 | HashMap1 | ... | HashMapN |
DoMerge由线程组来完成,为了避免无意义锁同步操作,采用每一线程独自合并一个分区方案,由于Partition数目往往远大于Worker数目,DoMerge阶段各线程承担工作量基本一致。
Build落盘
由于每一个Worker处理比较均衡,因此可以假设每个Worker处理数据量大致相同,直接将总内存均分值作为Worker内存配额。
限于内存容量,HashMatch并非总能将所有分区的HashMap与chunks维持在内存中,需要能够按一定规则进行落盘。由于HashMap与chunks均按分区隔开,因此当内存不足时按分区落盘比较直观。
当出现内存不足时,需要按一定规则将一些分区数据落盘,以便内存中分区能够正常进行Build与Probe阶段。目前HashMatch采用从最高分区开始整区落盘,直到能够完成处理前面分区,若出现连一个分区均无法处理时则直接抛出OOM。
在DoBuild不断构建的过程中,若当前Worker出现内存不足导致HashMap无法插入KV或不能保存chunk数据时,需要将该Worker内存中编号最高分区的数据进行落盘,即将chunks集合按chunk写入临时文件中并释放chunks内存,同时直接删除HashMap而不需要落盘,后面处理该分区时再从临时文件中加载chunks集合并通过chunks数据构建出该分区的HashMap。 对于一个Worker在内存中的最高分区号,其它Worker也是可见的。当一个Worker看到其它Worker的内存最高分区号比自己的小时,该Worker也会更新自己的最高分区号,并在适当时机进行内存释放,在DoBuild阶段也会不再构建大于最高分区号的分区中HashMap,但还是会将数据保存到chunk中,当chunk满后直接落盘。
Probe阶段
Build阶段读取左表并构建出散列表,而Probe阶段读取右表数据后查找散列表并根据匹配情况进行输出, 既然Build阶段已经将数据进行分区构建,那Probe阶段也需要按Build阶段所采用的数据分区规则来进行分区处理。
DoFetch
Probe阶段同样采用线程组处理方式,由父结点的Fetch操作来驱动。在DoFetch过程中,HashMatch的每个Worker同样不断fetch右表数据,对于fetch到的每一元组按分区规则到指定分区的HashMap中查找,然后根据匹配情况进行处理,不断重复该probe步骤直到所有Worker取完右表为止。
Probe落盘
若Build阶段中内存无法保存所有分区时,Probe阶段也需要针对内存分区和磁盘分区进行分别处理。
在DoFetch过程中,当Worker取到右表数据后,若该元组对应的分区在内存中则直接查找HashMap进行匹配处理,若该分区在磁盘中,则需要将该元组保存到该Worker所属Partition的chunk中,当该chunk满时则需要刷盘并释放chunk内存。当Worker取完右表并probe完成后,则表示内存中分区数据已经处理完成,可以释放内存中所有分区。
当全部处理完内存中的分区后,开始处理磁盘中的分区,由于磁盘中分区的数据已经按分区保存在不同临时文件中,为了避免锁同步,probe阶段仍采用一个磁盘分区由单独Worker独立完成,由于Partition数目往往远大于Worker数目,因此一般不会存在Worker处理不均问题。
当Worker开始处理磁盘中分区时,主要也是分为build与probe两阶段:
build阶段:先从该分区的临时文件中不断读取左表数据并序列化出chunk,然后根据左表chunk数据不断构建HashMap,不断重复该build步骤直到该Worker读取完左表数据为止。
probe阶段:从该分区的临时文件中不断读取右表数据并序列化出chunk,然后对其每一元组在该HashMap中查找根据匹配情况进行处理,不断重复该probe步骤直到该Worker读取完右表数据为止。
当所有Worker处理完所有磁盘分区后则整个HashMatch结束。虽然文档中按内存分区与磁盘分区进行不同处理说明,但实现时统一到了一套代码中。
Probe流程
HashMatch中probe主要由ProbeMem、ProbeLeft与ProbeDisk等三个步骤组成,但其真正probe处理均由Probe函数完成:
ProbeMem用于从右表读取数据并根据数据分区在内存或磁盘分别进行处理。若在内存中直接调用Probe处理,否则将数据保存到临时文件,以便ProbeDisk处理指定磁盘分区时重新加载后再调用Probe处理。
ProbeLeft主要用于LeftOuter/LeftSemi/LeftAntiSemi等Left类型的Join,其遍历整个HashMap所有KV并过滤出已匹配或未匹配过的元组。
ProbeDisk用于磁盘分区的probe操作,按分区来处理,处理指定磁盘分区时先从该分区的临时文件中加载chunk,然后直接调用Probe处理,若为Left类型的Join,还需要调用ProbeLeft对该分区进行处理。
Join逻辑
HashMatch实现Inner/LeftOuter/RightOuter/LeftSemi/LeftAntiSemi/RightSemi/RightAntiSemi及其PostFilter功能。所有join类型主体逻辑均可分为build与probe两个阶段,其中build阶段基本相同(区别在于对null的处理),主要区别在于probe阶段。在此只简单描述不同join类型的处理逻辑。
Inner
对于右表每一元组,若该元组非null且匹配左表HashMap中元组则输出该左表和右表元组。
LeftOuter
对于右表每一元组,若该元组非null且匹配左表HashMap中的元组,则输出该左表和右表元组。 遍历完右表后,对左表中所有均没被匹配过的元组输出该左表元组,而右表元组位置为null。 对于存在PostFilter的LeftOuter,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
RightOuter
对于右表每一元组,若该元组非null且匹配左表HashMap中元组则输出该左表和右表元组,若不匹配则输出右表元组,而左表元组位置置null。对于存在PostFilter的RightOuter,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
LeftSemi
LeftSemi流程类似LeftOuter,但其并不真正输出左表和右表元组,而根据下面真值表输出左表元组和NULL/TRUE/FALSE值,或仅输出左表元组,或不输出等。 对于存在PostFilter的LeftSemi,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
//+------------------------------+--------------+----------------+
//| mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//| normal true | (left, TRUE) | (left, ONLY) |
//+------------------------------+--------------+----------------+
//+------------------------------+--------------+----------------+
//| ! mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//|NULL v.s. (empty set) | | |
//|e.g., NULL IN (empty set) | (left, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|NULL v.s. (set) | | |
//|e.g., NULL IN (1, 2, 3) | (left, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|left_row v.s. (set with NULL) | | |
//|e.g., 10 IN (1, NULL, 3) | (left, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|normal false | | |
//|e.g., 10 IN (1, 2, 3) | (left, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
LeftAntiSemi
LeftAntiSemi流程类似LeftOuter,但其并不真正输出左表和右表元组,而根据下面真值表仅输出左表元组,或不输出等。 对于存在PostFilter的LeftAntiSemi,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
//+------------------------------+----------------+
//| ! mathched | ! semi_probe_ |
//+------------------------------+----------------+
//|NULL v.s. (empty set) | |
//|e.g., NULL NOT IN (empty set) | (left, ONLY) |
//+------------------------------+----------------+
//|NULL v.s. (set) | |
//|e.g., NULL NOT IN (1, 2, 3) | (left, ONLY) |
//+------------------------------+----------------+
//|left_row v.s. (set with NULL) | |
//|e.g., 10 NOT IN (1, NULL, 3) | (left, ONLY) |
//+------------------------------+----------------+
//|normal false | |
//|e.g., 10 NOT IN (1, 2, 3) | (left, ONLY) |
//+------------------------------+----------------+
RightSemi
RightSemi流程类似RightOuter,但其并不真正输出左表和右表元组,根据下面真值表输出右表元组和NULL/TRUE/FALSE值,或仅输出右表元组,或不输出等。 对于存在PostFilter的RightSemi,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
//+------------------------------+--------------+----------------+
//| mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//| normal true | (right, TRUE)| (right, ONLY) |
//+------------------------------+--------------+----------------+
//+------------------------------+--------------+----------------+
//| ! mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//|NULL v.s. (empty set) | | |
//|e.g., NULL IN (empty set) |(right, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|NULL v.s. (set) | | |
//|e.g., NULL IN (1, 2, 3) |(right, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|left_row v.s. (set with NULL) | | |
//|e.g., 10 IN (1, NULL, 3) |(right, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|normal false | | |
//|e.g., 10 IN (1, 2, 3) |(right, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
RightAntiSemi
RightAntiSemi流程类似RightOuter,但其并不真正输出左表和右表元组,而根据下面真值表仅输出右表元组,或不输出等。 对于存在PostFilter的RightAntiSemi,若匹配左表HashMap后还需要经过PostFilter来判断其是否真正匹配。
//+------------------------------+----------------+
//| ! mathched | ! semi_probe_ |
//+------------------------------+----------------+
//|NULL v.s. (empty set) | |
//|e.g., NULL NOT IN (empty set) | (right, ONLY) |
//+------------------------------+----------------+
//|NULL v.s. (set) | |
//|e.g., NULL NOT IN (1, 2, 3) | (right, ONLY) |
//+------------------------------+----------------+
//|left_row v.s. (set with NULL) | |
//|e.g., 10 NOT IN (1, NULL, 3) | (right, ONLY) |
//+------------------------------+----------------+
//|normal false | |
//|e.g., 10 NOT IN (1, 2, 3) | (right, ONLY) |
//+------------------------------+----------------+
实现
HashMatch将内存与磁盘处理、不同join类型与是否带PostFilter功能均抽象为一套处理流程。
HashMap
HashMap为散列表实现,主要提供插入和查找两个接口:
size_t PutValue(uint64_t hash_code, const char *key_buf, uint64_t key_len, const uint64_t tuple);
ValueIterator FindValue(uint64_t hash_code, const char *key_data, const uint64_t key_len, const bool need_mark = false);
同时提供两个迭代器用于遍历整张散列表:
enum IteratorType { Normal = 0, NoneMark = 1, Mark = 2, END };
class TableIterator {
public:
void Next();
bool IsValid() const { return valid_; }
ValueIterator GetIterator(IteratorType type);
private:
IteratorType type_ = IteratorType::END;
};
class ValueIterator {
struct Listener {
virtual void BlockEvent() {}
};
void SetListener(Listener *listener) { listener_ = listener; }
void Next();
bool IsValid() const { return valid_; }
private:
IteratorType type_ = IteratorType::Normal;
Listener *listener_ = nullptr;
};
TableIterator迭代器用于遍历HashMap的全部KV;而ValueIterator迭代器用于遍历当前KV的全部数据块;其中TableIterator与ValueIterator均提供Normal/NoneMark/Mark三种迭代模型,用于不同join类型。
由于TableIterator用于遍历全部HashMap,其主要用于LEFT_OUTER/LEFT_ANTI_SEMI/LEFT_SEMI等。
Info
HMInfo主要用于存放所有Worker共享全局数据,如内存分区号与分区对象,其中分区中保存当前分区合并后的HashMap、左右表chunks集合与左右表临时文件对象。每一个分区HMPartition均有单独临时文件,通过pread/pwrite函数与offset原子变量来提供给所有Worker进行原子读写操作。
Local Info
HMLocalInfo主要用于存放当前Worker的私有数据,如当前Worker的内存分区号与左右分区对象HMLocalPartitions,其中每一分区对象HMLocalPartition保存当前Worker当前Partition的HashMap、chunks集合与正在写待完整的chunk对象等。
Fetcher
HashMatch存在向左表取数据、右表取数据、从Info/LocalInfo内存chunks集合中读取chunk对象、从临时文件中读取并序列化出chunk对象等多种不同fetch方式,虽然方式各异但其fetch数据均为chunk对象,其用于Build与Probe阶段。
class HashMatchFetcher final {
bool Fetch(Context &context, TupleChunk *&mat_chunk);
// fetch from left or right child
bool FetchMem(Context &context);
// fetch from info chunks (include load from temp files)
bool FetchDisk(Context &context, TupleChunk *&mat_chunk);
size_t part_index_ = 0;
TupleChunk chunk_;
};
Builder
Builder类用于Build阶段DoBuild操作,统一处理内存分区与磁盘分区的build功能。
左表构建(MemBuilder类):从左表中fetch到数据保存到chunks集合中,若该元组属于内存分区的则插入HashMap,属于磁盘分区则将chunk数据落盘。
磁盘构建(DiskBuilder类):从临时表中读取chunks集合,并构建该分区的HashMap。
class HashMatchBuilder {
void Build();
virtual void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) = 0;
virtual void ChunkDone() = 0;
HashMatchFetcher fetcher_;
};
class HashMatchMemBuilder final : public HashMatchBuilder {
void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) override;
void ChunkDone() override;
TupleChunk origin_chunk_;
};
class HashMatchDiskBuilder final : public HashMatchBuilder {
void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) override;
void ChunkDone() override;
const size_t part_index_ = 0;
};
Prober
Probe阶段ProbeMem/ProbeLeft/ProbeDisk操作均由Prober类完成,其统一处理内存分区与磁盘分区的probe功能。
class HashMatchProber final {
public:
void ProbeResult(TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
bool ProbeIter(Context &context, TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
bool Probe(Context &context, TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size, const bool disk);
private:
const HashMatch &join_;
HMInfo *info_ = nullptr;
HMLocalInfo *local_info_ = nullptr;
size_t part_index_ = 0;
PostFilter filter_;
LeftIterator lit_;
RightIterator rit_;
TraverseIterator tit_; // used for probe left
};
HashMatchProber::PostFilter类主要对带PostFilter的Join类型的Probe后期进行处理,即经过Probe得到结果集还需要再由PostFilter处理后才能确定其是否真正匹配。
struct PostFilter final {
bool Evaluate();
bool Probe(TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
const HashMatchProber &prober_;
const RTExprTreePtr &post_expr_;
const HashMatchExpr &left_expr_;
const HashMatchExpr &right_expr_;
std::shared_ptr<Expressions::ExprEnv> post_env_ = nullptr;
};
Prober提供LeftIterator/RightIterator/TraverseIterator等三种类型迭代器。
struct Iterator {
virtual void InitExpr() {}
virtual void FiniExpr() {}
virtual void Init(const size_t part_index);
virtual void Fini();
virtual bool Valid(Context &context) { return false; }
virtual void Next() = 0;
HashMatchProber &prober_;
PostFilter &filter_;
};
HashMatchProber::LeftIterator类使用HashMap::ValueIterator来遍历HashMap指定key的所有value并根据value定位到指定chunk元组,即对外提供指定key的所有元组功能,统一处理不同join类型和PostFilter功能。
// for Probe
struct LeftIterator final : public Iterator, public ValueIterator::Listener {
void BlockEvent() override;
bool Valid(Context &context) override;
void Next() override;
bool Find(const size_t part_index, const uint64_t hash_val,
const char *key_data, const uint64_t key_len);
ValueIterator it_;
};
HashMatchProber::RightIterator类不断使用HashMatchFetcher从右表或临时文件中获取chunk集合并对外提供遍历所有chunk所有元组的功能。 所有类型Join均由ProbeMem/ProbeDisk使用RightIterator获取chunk,然后在该分区中查找HashMap,若找到则构造LeftIterator对象来遍历该key的所有元组,另外Left类型Join还需要ProbeLeft处理。
// for Probe
struct RightIterator : public Iterator {
bool Valid(Context &context) override;
void Next() override;
HashMatchFetcher fetcher_;
TupleChunk origin_chunk_;
size_t chunk_size_ = 0;
};
HashMatchProber::TraverseIterator类主要用于Left类型Join,如LeftOuter/LeftSemi/LeftAntiSemi等,其使用HashMap::TableIterator遍历整个HashMap并过滤出已匹配或未匹配的key,然后使用LeftIterator来遍历该key的所有元组。 Left类型Join处理流程是先使用ProbeMem/ProbeDisk查找HashMap并进行匹配处理,若匹配则在HashMap中标记该KV,然后由ProbeLeft来使用TraverseIterator来整个HashMap并过滤出已匹配或未匹配的KV处理即可。
// for ProbeLeft
struct TraverseIterator final : public Iterator {
bool Valid(Context &context) override;
void Next() override;
TableIterator tit_;
LeftIterator lit_;
IteratorType it_type_ = IteratorType::END;
};
测试
在TPC-H,1 TB数据集,Q14上对比HashJoin与HashMatch的性能,其中HashJoin算法与HashMatch基本一致,主要区分在于中间结果是否物化。
select
100.00 * sum(case
when p_type like 'PROMO%'
then l_extendedprice * (1 - l_discount)
else 0
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from
lineitem,
part
where
l_partkey = p_partkey
and l_shipdate >= date '1995-09-01'
and l_shipdate < date '1995-09-01' + interval '1' month;
在LRU缓存与执行器内存均为100 GB配置下进行查询:
Query(TPCH1T)
HashJoin
HashMatch
Q14
23.96秒
12.56秒
在LRU缓存与执行器内存均为32 GB配置下进行查询:
Query(TPCH1T)
HashJoin
HashMatch
Q14
>10分
35.73秒