动态文件剪枝(Dynamic File Pruning, DFP)可以大幅改善许多Delta表查询的性能。动态文件剪枝对于未分区的表或者未分区列的join来说更加有效。DFP的性能提升通常还和数据的聚集相关联,可以考虑使用ZOrdering来最大化DFP的性能收益。
详细内容可参考Databricks官网文章:动态文件修剪
想要了解更多的DFP的背景知识及使用案例,可以参考Databricks的官方博客:使用DFP加速SQL查询。
在Databricks Runtime 6.1及更高版本中可用。
DFP主要由如下几个Spark配置项控制:
spark.databricks.optimizer.dynamicFilePruning(默认值为true):表示是否使用DFP,如果为true,则启动DFP,下沉DFP的过滤器,减少扫描的数据量。如果设置为false,则不启用DFP。
spark.databricks.optimizer.deltaTableSizeThreshold(默认值为1000000000字节(10 GB)):表示在进行join时,触发DFP的最小的表的大小。如果表不够大,可能使用DFP不如直接扫描全表。你可以使用命令:
DESCRIBE DETAIL table_name
,然后查看sizeInBytes
列获取到表的大小。spark.databricks.optimizer.deltaTableFilesThreshold(在Databricks 8.3之前是1000,在Databricks 8.4及之后为10):表示在进行join时,触发DFP的最小的表文件的数量。如果表中的文件数量小于该阈值,则DFP不会被触发。如果表中的文件数量过少,则扫描全表的开销可能会更小一些。你可以使用命令:
DESCRIBE DETAIL table_name
,然后查看numFiles
列获取到表中的文件数量。
使用案例
测试数据生成:
在本节中我们使用TPCDS数据集作为测试数据,主要使用到store_sales和item表,下载包请联系Databricks运维,并上传到您的OSS中,然后再DDI的项目空间中创建Spark作业生成测试数据:
--class com.databricks.spark.sql.perf.tpcds.GenTPCDSData
--deploy-mode cluster
--name generate_dataset
--queue default
--master yarn
--conf spark.yarn.submit.waitAppCompletion=true
--conf spark.driver.cores=6
--conf spark.driver.memory=12G
--conf spark.executor.cores=3
--conf spark.executor.memory=6G
--conf spark.executor.instances=20
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.default.parallelism=100
--conf spark.shuffle.service.enabled=true
--conf spark.sql.autoBroadcastJoinThreshold=-1
oss://[your_path]/jars/spark-sql-perf-assembly-0.5.1-SNAPSHOT.jar
-m yarn
--dsdgenDir /home/hadoop/tpcds-kit/tools
--scaleFactor 100
--location oss://[your_path]/
--format delta
--overwrite true
--numPartitions 1000
-y 0
-g true
在上述的Spark作业中,需要对jar文件的路径和数据产生的路径修改为您的OSS中的路径,其他参数可以保持不变,scaleFactor表示产生的数据量,单位为GB,在这里我们产生100GB的数据。如果出现报错请联系DDI的开发人员。
创建一个Notebook,在该Notebook中进行试验,先创建数据库,并在数据库中基于以上生成的数据创建表,对表store_sales的ss_item_sk
列做ZOrdering优化:
%sql
-- 创建数据库
CREATE DATABASE dfp;
-- 使用刚创建的数据库
USE dfp;
-- 基于上述生成的数据,创建数据表,创建item表:
CREATE TABLE item USING DELTA
LOCATION "oss://[your_path]/data/item";
-- 创建store_sales表:
CREATE TABLE store_sales USING DELTA
LOCATION "oss://[your_path]/data/store_sales";
-- 对store_sales表的ss_item_sk字段进行ZOrdering优化
OPTIMIZE store_sales ZORDER BY (ss_item_sk);
案例1:静态文件剪枝
由于我们在store_sales表的ss_item_sk
列上执行了ZOrdering优化,且Delta Lake保存了每一列的min-max统计信息,因此,当我们执行查询如下查询时:
%sql
SELECT sum(ss_quantity) FROM store_sales WHERE ss_item_sk IN (40, 41, 42);
由于我们使用了ss_item_sk IN (40, 41, 42)
作为where查询条件,Delta Engine会利用表文件的统计信息对表文件剪枝,跳过不包含ss_item_sk IN (40, 41, 42)
的表文件。上述查询的逻辑执行计划如下图所示,过滤条件被往下推,很大程度减少了扫描的数据量。
而对于join条件来说,比如常见的星形表连接,join的过滤条件在查询编译阶段对事实表是未知的,因此,就需要一种和where查询条件不一样的方式来进行文件剪枝。
案例2:不使用DFP的星型表连接
在Notebook的第一个paragraph中设置spark.conf,关闭DFP(默认开启):
%spark.conf
spark.databricks.optimizer.dynamicFilePruning false
然后执行一个典型的星型查询:
%sql
USE dfp;
SELECT sum(ss_quantity) FROM store_sales
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA';
这个查询和案例1的查询的返回值相同,但这里的where查询条件是针对维表的,而非事实表。这也意味着对表store_sales的过滤会作为Join操作的一部分,因为在扫描和过滤表item之后才能获取到ss_item_sk
的值。
上述查询的逻辑执行计划如下图所示,从图中可以看出,虽然仅过滤出了4万条数据,但是却扫描了表store_sales的80多亿条记录。如果我们能将Join的条件像案例1中的where条件一样往下推,那么将可以大大减少需要扫描的数据量,提升查询效率,这正是DFP的动机和实现原理。
案例3:使用DFP的星型表连接
重启Zeppelin的Interpreter,启动DFP(删除 %spark.conf 的paragraph),然后重新执行查询:
%sql
USE dfp;
SELECT sum(ss_quantity) FROM store_sales
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA';
将会生成如下图所示的逻辑执行计划,由于将过滤条件下推到SCAN Operator中,扫描的数据量从80亿条变为600多万条,大幅降低了扫描的数据量。由于DFP是以文件为粒度进行剪枝的,因此相对于最终的结果来说,还是扫描了很多无关数据,但和未使用DFP相比,已经有很大的性能提升。