JOIN是分布式系统中常见的操作,同时也是一个耗时、耗资源的操作,因为其涉及到的Shuffle操作尤其在海量数据场景下,会耗费较多的资源和时间。针对Shuffle操作,MaxCompute可以利用JOIN本身的等值连接属性进行优化。

优化思路

一个典型的包含JOIN的SQL语句如下:
select * from (table1) A join (table2) B on A.a= B.b;

基于JOIN等值连接的特性,MaxCompute可以通过表A的数据生成一个过滤器,在Shuffle或JOIN之前提前过滤表B的数据,甚至可以将过滤器下推到底层存储,在源头过滤数据。这种在作业运行时动态生成过滤器的功能称为动态过滤器DF(Dynamic Filter)

上述SQL语句在打开动态过滤器前后的执行计划示意图如下。

JOIN

应用场景

动态过滤器功能是利用等值JOIN的特性,基于运行时动态生成过滤器,以便在Shuffle或JOIN之前提前过滤数据,实现加速查询运行。该功能适用于维度表和事实表执行JOIN的场景。

动态范围过滤器或布隆过滤器(Dynamic Range|Bloom Filter)

从上图可知,在原始的执行计划中不存在过滤器,过滤器是由系统根据JOIN的特性自动产生的,它的作用就是判断B表中的元素是否存在于A表生成的集合中,如不存在,则过滤掉。

从空间和时间效率上看,Bloom Filter正是上述功能的有效选择。但在实现中,动态过滤器不仅包含Bloom Filter,还可以包含Range Filter,即利用[min, max]来过滤数据。

动态过滤器是一个典型的生产者和消费者模式,示意图如下。动态过滤器
  • DFP(Dynamic Filter Producer)算子:动态过滤器的生产者(Producer),利用小表侧的数据生成Bloom Filter及获取JOIN Key对应的minmax值(Range Filter),然后发送至DFC。
  • DFC(Dynamic Filter Consumer)算子:动态过滤器的消费者(Consumer),利用Bloom Filter和Range Filter过滤大表侧的数据。Range Filter会尽可能将过滤条件下推到底层存储,以便从源头过滤数据。
对于不同类型的JOIN语义,JOIN对象可担任的角色不同:
  • A JOIN B:A或B都能作为生产者、消费者。
  • A LEFT JOIN B:A只能作为生产者,B只能作为消费者。
  • A RIGHT JOIN B:A只能作为消费者,B只能作为生产者。
  • A FULL OUTER JOIN B:无法使用动态过滤器功能。

动态过滤器的使用方法请参见动态过滤器的使用方法

动态分区裁剪(Dynamic Partition Pruning)

上述Bloom Filter或Range Filter的例子是基于非分区表的优化,即JOIN Key是非分区列。当JOIN Key为分区列时,动态范围过滤器或布隆过滤器(Dynamic Range|Bloom Filter)仍然可用,但MaxCompute会读取完整个分区的数据后再过滤数据,读取分区数据的过程可以进一步优化。即在读取数据前,将无用的分区裁剪掉,即动态分区裁剪DPP(Dynamic Partition Pruning)功能。

例如包含JOIN的SQL语句如下:
--A为非分区表,表中a列的值为20200701。
--B为分区表,表中ds列的值包含3个分区20200701、20200702、20200703。
select * from (table1) A join (table2) B on A.a= B.ds;

打开动态分区裁剪功能后,优化器会根据表是否是分区表来决定是否采用动态分区裁剪功能。动态分区裁剪功能生效后,MaxCompute会采集小表侧数据生成Bloom Filter,然后过滤大表侧的分区列表,再把需要读取的分区列表聚合,裁剪掉不需要扫描的分区。如果一个运行进程所有待读的分区都被裁剪了,则该进程不被调度。

在上述示例中,由于A表中的a列值只有20200701,因而打开动态分区裁剪功能后,B表中的20200702和20200703分区会被裁剪掉,既节省了资源,也降低了作业运行时长。

动态分区裁剪功能的使用方法请参见动态分区裁剪的使用方法

动态过滤器的使用方法

MaxComopute提供了如下打开动态过滤器的方式:
  • 方式一:在Session级别通过开关强制打开动态过滤器,与SQL语句一起提交执行。
    set odps.optimizer.force.dynamic.filter=true;
    说明 该属性也支持在Project级别设置,但推荐您在Session级别设置。由于Project级别开启后,对所有JOIN作业都会启用动态过滤器,当JOIN无法过滤数据时,处理效率反而更低。

    使用该方式时,对所有能打开动态过滤器的JOIN,都插入动态过滤器。

  • 方式二:在Session级别通过开关智能打开动态过滤器。
    set odps.optimizer.enable.dynamic.filter=true;

    使用该方式时,优化器会智能地估计插入动态过滤器是否有足够的资源或时间获益,如果有收益则插入动态过滤器,否则不会插入。

    说明 该方式依赖元数据统计,例如ndv,更多元数据统计信息,请参见优化器信息收集。因为元数据统计是优化器的估算结果,可能不准确,因此会存在无法如预期地插入动态过滤器。
  • 方式三:在SQL语句中通过HINT方式打开动态过滤器。
    HINT格式为/*+dynamicfilter(Producer, Consumer1[, Consumer2,...])*/,允许一个生产者过滤多个消费者。命令示例如下:
    select /*+dynamicfilter(A, B)*/ * from (table1) A join (table2) B on A.a= B.b;

动态分区裁剪的使用方法

在Session级别通过开关打开动态分区裁剪功能,与SQL语句一起提交执行。

set odps.optimizer.dynamic.filter.dpp.enable=true;
说明 该属性也支持在Project级别设置,但推荐您在Session级别设置。由于Project级别开启后,对所有JOIN作业中涉及到的分区表都会启用动态分区裁剪功能,当JOIN无法过滤数据时,处理效率反而更低。

验证方法

如果您已按照动态过滤器的使用方法动态分区裁剪的使用方法完成配置,运行SQL作业后,查看作业的Logview信息。如果Logview中出现类似DynamicFilterConsumer1的算子,说明动态过滤器已生效。

确认过滤器结果