本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
为了提升查询性能,Delta Engine对数据的存储和布局进行了优化,目前支持两种布局算法:bin-packing和Z-Ordering。在本文中,我们会介绍如何使用这两种布局算法并给出使用案例。此外我们还介绍了Delta Engine的Data skipping功能,以及该功能如何自动提升您的查询性能。最后,我们介绍如何使用Delta Engine的表文件自动调整功能,优化表文件的存储和查询效率。
详细内容可参考Databricks官网文章:通过文件管理优化性能
压缩 (bin-packing)
在流处理场景下不断向表中数据插入数据,或者merge,update等操作,会产生大量的小文件,过多的小文件会导致查询变慢,并且会引起系统扩展性问题。bin-packing的设计就是为了解决这些问题的。
如何使用?
为了改善查询性能,Delta Engine提供了OPTIMIZE命令来对表中的数据布局进行优化,将小文件进行合并:
%sql
OPTIMIZE [table_name | delta.`/table/path`]
该命令不但支持全表小文件的合并,还支持特定partition的合并,例如我们可以仅对date大于2017-01-01的分区中的小文件进行合并:
%sql
OPTIMIZE [table_name | delta.`/table/path`] WHERE date >= '2017-01-01'
除了手动执行OPTIMIZE外,你还可以使用Auto-optimize来对表中的数据布局进行优化。
Bin-packing是幂等的,这意味着在同一数据集上运行1次Optimize和运行N次的效果是相同的。
Bin-packing的目标是表中的数据量生成大小均衡的数据文件。
使用案例
测试数据生成:创建10,000个小文件,每个文件中包含10,000行连接数据:(src_ip, src_port, dst_ip, dst_port),基于这些文件创建外部表:conn_rand
%spark
import spark.implicits._
import scala.util.Random
val numRecords = 100*1000*1000L
val numFiles = 10000
// 连接数据
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
// 生成随机的ip和port
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)
// 生成一条随机的连接数据
def randomConnRecord(r: Random) = ConnRecord(
src_ip = randomIPv4(r), src_port = randomPort(r),
dst_ip = randomIPv4(r), dst_port = randomPort(r))
// 生成10000个partition,每个partition中包含10000条连接数据
val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
val partitionID = it.toStream.head
val r = new Random(seed = partitionID)
Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
}
// 将数据保存到oss中,并基于数据建立table
(df.write
.mode("overwrite")
.format("delta")
.option("path", "oss://databricks-delta-demo/ip_demo")
.saveAsTable("conn_rand"))
查询:157开头的源IP和216开头的目的IP的连接数量
SELECT COUNT(*)
FROM conn_rand
WHERE src_ip LIKE '157.%' AND dst_ip LIKE '216.%'
时间消耗为:56s,下面我们使用OPTIMIZE命令对表中的小文件进行合并:
OPTIMIZE conn_rand;
在进行合并之后,在OSS中生成两个877MB的大文件(OPTIMIZE生成的文件最大为1GB)。
在执行OPTIMIZE之后,重新执行上述查询,查询时间为7s。可以看出,在优化之后查询性能得到很大的提升。
在Databricks Runtime 6.0及更高版本中可用。
Data skipping
当你向Delta表中写入数据时,Delta Engine会自动收集表的前32列的统计信息(最小最大值,为空的行的数量)以提升查询效率。该特性是自动开启的,不需要进行任何配置。
收集长字符串列的统计信息开销会很大,为了避免Delta Engine自动收集长字符串列的统计信息,可以配置表特性 dataSkippingNumIndexedCols避免,使得该特性小于长字符串所在列的索引,或者配置该值后,将长字符串列移动到该特性值之后的位置。该表特性的默认值为32,即默认收集前32列的统计信息。
Data skipping的原理:我们以一张Delta表的x列为例,假设给定的表文件x列的最小值为5,最大值为10,如果查询条件为 where x < 3,则根据表文件的统计信息,我们可以得出结论:该表文件中一定不包含我们需要的数据,因此我们可以直接跳过该表文件,减少扫描的数据量,进而提升查询性能。
Data skipping的实现原理和布隆过滤器类似,通过查询条件判断表文件中是否可能存在需要查询的数据,从而减少需要扫描的数据量。如果不可能存在查询的数据,则可以直接跳过,如果可能存在被查询的数据,则需要扫描表文件,但被扫描的表文件中不一定包含查询的数据,我们将这种判断表文件中包含查询数据,但实际并不存在的情况称为假阳性。
为了能尽可能多的跳过和查询无关的表文件,我们需要尽可能缩小该表中min-max的差距,使得相近的数据尽可能在文件中聚集。举一个简单的例子,假设一张表包含10个表文件,对于表中的x列,它的取值为[1, 10],如果每个表文件的x列的分布均为[1, 10],则对于查询条件:where x < 3,无法跳过任何一个表文件,因此,也无法实现性能提升,而如果每个表文件的min-max均为0,即在表文件1的x列分布为[1, 1],表文件2的x列分布为[2, 2]...,则对于查询条件:where x < 3,可以跳过80%的表文件。受该思想的启发,Delta Engine支持使用Z-Ordering来对数据进行聚集,缩小表文件的min-max差距,提升查询性能。下面我们介绍Z-Ordering的使用。
Z-Ordering (多维聚类)
Z-Ordering将相关联的信息存储到同一组文件中,这种聚集会自动被Delta Engine的Data-Skipping算法使用,显著减少需要扫描的数据数量。
如何使用?
想要使用ZOrder来优化你的数据布局,仅需要在OPTIMIZE时,增加ZORDER BY子句即可。
OPTIMIZE events
ZORDER BY (eventType)
Z-Order支持在多个维度(多列)优化数据布局,在Z-Ordering多列时,使用逗号分隔:
OPTIMIZE events
ZORDER BY (eventType, generateTime)
如果您经常在where语句中使用到某个列,且该列的基数很大(有很多取值,值域很宽),那么使用Z-Ordering可以显著提升您的查询性能。
Z-Ordering只对已经收集了统计信息的列生效,在上一节我们介绍过,Delta Engine默认仅为前32列自动生成统计信息,意味着Z-Ordering也只能被用于前32列,如果您查询的列索引大于32,可以将该列索引调到32以内。
Z-Ordering不是幂等的,而是一种增量操作。多次运行间不能保证Z-Ordering所需的时间减少。但是,如果没有数据添加到刚被Z-Order过的数据,则再次执行Z-Ordering不会改变上次执行完Z-Ordering的数据布局,执行时间理论上会减少。
使用案例
在本案例中,我们使用和Bin-packing压缩相同的数据集,创建10000个小文件,每个文件中包含10w条网络连接数据,遵循 (src_ip, src_port, dst_ip, dst_port) 的格式,在生成的数据之上创建一张表:
%spark
import spark.implicits._
import scala.util.Random
val seed = 0
val numRecords = 1000*1000*1000L
val numFiles = 10000
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)
def randomConnRecord(r: Random) = ConnRecord(
src_ip = randomIPv4(r), src_port = randomPort(r),
dst_ip = randomIPv4(r), dst_port = randomPort(r))
val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
val partitionID = it.toStream.head
val r = new Random(seed = partitionID)
Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
}
(df.write
.mode("overwrite")
.format("delta")
.option("path", "oss://databricks-delta-demo/conn_record")
.saveAsTable("conn_record"))
执行如下SQL,我们可以得到每个表文件的统计信息:
SELECT row_number() OVER (ORDER BY file) AS file_id,
count(*) as numRecords, min(src_ip), max(src_ip), min(src_port),
max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_record)
GROUP BY file
从表中统计信息来看,由于每一列的min-max的范围都比较宽,特别是端口号,几乎覆盖了端口列的值域,对于这种情况,Delta Engine无法使用文件的统计信息来跳过查询无关文件,因此无法实现有效优化。
我们执行如下查询:
SELECT COUNT(*) FROM conn_record
WHERE src_ip like '157.%' AND dst_ip like '216.%'
AND src_port = 10000 AND dst_port = 10000;
当我们执行上述查询时,Delta Engine扫描了全表,耗时为3.5min。
下面我们对 (src_ip, src_port, dst_ip, dst_port) 四列进行Z-Order优化:
OPTIMIZE conn_record
ZORDER BY (src_ip, src_port, dst_ip, dst_port);
重新执行表文件信息统计的SQL:
SELECT row_number() OVER (ORDER BY file) AS file_id,
count(*) as numRecords, min(src_ip), max(src_ip), min(src_port),
max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_record)
GROUP BY file
可以发现执行Optimize之后,文件数量减少为26个,合并了大量的小文件,另一方面,数据的min-max range变窄很多,可以更好的实现Data-Skipping,我们重跑上面的查询SQL:
SELECT COUNT(*) FROM conn_record
WHERE src_ip like '157.%' AND dst_ip like '216.%'
AND src_port = 10000 AND dst_port = 10000;
在优化后,执行该查询扫描的数据量仅有889.5MB,向比未优化少了30倍,并且查询时间减少为5s,提升了42倍。
本次使用的示例数据量较少(使用delta格式压缩存储,26.6GB左右),性能提升效果还不是那么明显,当数据量较大时,性能提升会更加显著,甚至可以达到百倍的性能提升。想要了解更多Optimize的使用案例,可以参考使用Delta在秒级内处理PB级数据。
表文件大小调优
设置目标文件大小
如果想要调整Delta表的文件大小,可以通过设置表属性:delta.targetFileSize 来实现。一旦设置了该属性,所有的数据布局优化操作(如:小文件合并,Z-Ordering和写优化)都会尽可能产生给定大小的文件。
针对新创建的表:
CREATE TABLE student USING delta
LOCATION "oss://delta-demo/student"
TBLPROPERTIES ("delta.targetFileSize" = "100MB")
针对现存表:
ALTER TABLE student
SET TBLPROPERTIES ("delta.targetFileSize" = "100MB")
基于负载自动调整文件大小
Delta Engine会根据对表执行的操作对delta表的文件大小进行自动调整。Delta engine会自动检测出Delta表最近是否有频繁的MERGE操作重写文件,如果出现频繁的文件重写,则Delta Engine会减小重写的文件大小以提升未来再次被重写的性能。
例如,在执行MERGE操作时,如果之前的10个操作,有9次操作都是MERGE,则本次MERGE操作会生成相对较小的文件,从而提升未来的MERGE操作的性能。
自动调整会在几次表文件重写操作之后才会被激活,但如果你的使用场景本身就会频繁的执行MERGE,UPDATE和DELETE操作,你想要立刻激活自动调整文件大小这一特性,则可以通过设置表属性:delta.tuneFileSizesForRewrites
实现,如果将该表属性设置为true,则在该表上执行任何数据布局优化操作都会使用相对较小的文件大小。如果将该表属性设置为false,则会关闭Delta Engine的自动检测。
基于表大小调整表文件大小
Delta Engine会根据表的大小自动调整表文件的大小,对于比较小的表,Delta Engine会使用较小的文件,对于较大的表,Delta Engine会使用较大的文件,从而防止表中的文件数量变得非常多。
具体来看,当整张表的大小小于2.56TB时,会以256MB作为目标表文件大小,当表的大小介于2.56TB-10TB之间时,目标文件大小线性增长,当表的大小大于10TB后,以1GB作为目标表文件大小。
需要注意的是,如果设置了表属性:delta.targetFileSize
或者delta.tuneFileSizesForRewrites
,则该Delta Engine的该特性会自动失效。
提高交互式查询性能
Delta Engine提供了一些其他机制来提高查询性能。
管理数据实效性
在每个查询开始时,Delta表会自动更新到表的最新版本。当命令状态报告: Updating the Delta table's state时,可以在笔记本中观察到这个过程。但是,在表上运行历史分析时,您可能不需要最新的数据,特别是在频繁引入流式处理数据的表中。 在这些情况下,可以在 Delta 表的过时快照上运行查询。 这会降低从查询获取结果的延迟时间。
可以通过将 Spark 会话配置 spark.databricks.delta.stalenessLimit 设置为时间字符串值(例如 1h、15m、1d 分别为 1 小时、15 分钟和 1 天)来配置表数据的过时程度。此配置是特定session,因此不会影响其他用户从其他笔记本、作业或BI工具访问此表。另外,此设置不会更新表。它只会阻止查询等待表更新。该更新仍在后台进行,并将在整个集群之间公平地共享资源。如果超过过期限制,则查询将在表状态更新上阻止。
用于低延迟查询的增强检查点
Delta Lake 写入检查点作为 Delta 表的聚合状态,每 10 次提交写入一次。这些检查点用作计算表的最新状态的起点。如果没有检查点 ,Delta Lake将不得不读取大量的JSON文件(“Delta”文件),表示提交到事务日志以计算表的状态。此外,此外,列级统计信息 Delta Lake 用于执行存储在检查点中的数据跳过操作。
Delta Lake 检查点与结构化流checkpoints不同。
在Databricks Runtime 7.2及更低版本中,列级别的统计信息作为JSON列存储在Delta Lake 检查点中。在Databricks Runtime 7.3LTS及更高版本中,列级别的统计信息存储作为结构。结构格式使Delta Lake读取速度更快,因为:
Delta Lake不会执行昂贵的JSON解析来获取列级统计信息。
Parquet 列修剪功能可以显著减少读取列的统计信息所需的 I/O
结构格式可启用一系列优化,这些优化可将Delta Lake读取操作的开销从几秒降低到数十毫秒,从而显着减少短查询的延迟。
管理检查点中的列级统计信息您可以使用表属性delta.checkpoint.writestatsassjson以及delta.checkpoint.writeStatsassTrust.管理如何在检查点中写入统计信息。如果两个表属性都为false,则Delta-Lake无法执行跳过数据。
在Databricks Runtime 7.3 LTS及更高版本中:
批量写入JSON和结构格式编写写入统计信息。delta.checkpoint.writeStatsAsJson默认值为true。
流式处理只以JSON格式写入统计信息(以最大程度地减少检查点对写入延迟的影响)。若要同时编写结构格式,请参见为结构化流式查询启用增强的检查点。
在这两种情况下,都默认未定义 delta.checkpoint.writeStatsAsStruct。
读取器在可用时使用结构列,否则退回到使用JSON列。
在Databricks运行时7.2及以下版本中,读者只使用JSON列。因此,如果delta.checkpoint.writestatsassjson为false,此类读取器无法执行跳过数据。
增强的检查点不会破坏与开源Delta-Lake 读取器的兼容性。但是,设置delta.checkpoint.writestatsassjson为false可能会影响到Delta Lake的专有读取器。请与您的供应商联系以了解有关性能影响的更多信息。
检查点中统计信息的权衡
由于在检查点中编写统计信息会产生成本(即使对于大型表,通常也要不到一分钟),因此需要权衡在编写检查点所花费的时间与Databricks Runtime 7.2及更低版本的兼容性。如果您能够将所有工作负载升级到Databricks Runtime 7.3 LTS或更高版本,则可以通过禁用旧版JSON统计信息来降低编写checkpoints的成本。下表总结了这一折衷方案。
如果跳过数据不适用于你的应用程序,则可以将两个属性都设置为false,并且不收集或写入任何统计信息。我们不建议这种配置。
writeStatsAsStruct | |||
false | true | ||
writeStatsAsJson | false |
|
|
true |
|
|
为结构化流查询启用增强的checkpoints
如果您的结构化流工作负载没有低延迟要求(要求延迟在一分钟以内),则可以通过运行以下SQL命令来启用增强检查点:
SQL
%sql
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')
如果您不使用Databricks Runtime 7.2或更低版本来查询数据,则还可以通过设置以下表属性来改善检查点写入延迟:
SQL
%sql
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.checkpoint.writeStatsAsJson' = 'false'
)
禁止从没有统计结构的检查点的集群中写入
Databricks Runtime 7.2及更低版本中的编写器会写入无统计结构检查点,从而妨碍了对Databricks Runtime 7.3 LTS阅读器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用以下upgradeTableProtocol方法升级Delta表:
Python
%pyspark
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
%spark
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表,并且此更改是不可逆的。我们建议仅在您采用新格式后才升级表。您可以通过使用Databricks Runtime 7.3为表创建浅表克隆来尝试这些优化。
升级表写入程序版本后,写入程序必须遵守'delta.checkpoint.writeStatsassTrust'和'delta.checkpoint.writestatsassjson'.的设置
下表总结了如何在不同版本的Databricks Runtime、表协议版本和编写器类型中利用增强的检查点。
Without Protocol Upgrade | With Protocol Upgrade | |||||
Databricks Runtime 7.2及以下编写器 | Databricks Runtime 7.3及更高版本的批处理编写器 | Databricks Runtime 7.3及更高版本的流编写器 | Databricks Runtime 7.2及以下编写器 | Databricks Runtime 7.3及更高版本的批处理编写器 | Databricks Runtime 7.3及更高版本的流编写器 | |
Databricks Runtime 7.2及以下读取器性能 | 没有得到改进 | 没有得到改进 | 没有得到改进 | 不能使用编写器 | 没有得到改进 | 没有得到改进 |
Databricks Runtime 7.3及更高版本的读取器性能 | 没有得到改进 | 默认情况下改进 | 通过表格属性选择Opt-in(1) | 不能使用编写器 | 默认情况下改进 | 通过表格属性选择Opt-in(1) |
(1)设置表属性'delta.checkpoint.writeStatsAsStruct' = 'true'
禁用使用旧检查点格式的从集群中写入Databricks Runtime 7.2及更低版本的编写器可以编写旧格式的检查点,这将妨碍对Databricks Runtime 7.3编写器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用upgradeTableProtocol方法升级Delta表:
Python
%pyspark
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
%spark
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表。这种更改是不可逆的。因此,我们建议仅在提交新格式后才升级表。您可以通过使用Databricks Runtime 7.3创建表的浅表克隆来尝试这些优化:
Databricks Runtime 7.x:CLONE(Databricks上的Delta Lake)
常见问题(FAQ)
为什么OPTIMIZE不是自动的?
OPTIMIZE操作启动了多个Spark作业,以便通过压缩来优化文件大小(并可以选择执行 Z-Ordering)。由于OPTIMIZE执行的内容大部分是压缩小文件,因此您必须先累积许多小文件,然后此操作才能生效。因此,该OPTIMIZE操作不会自动运行。
此外,运行 OPTIMIZE(特别是 ZORDER)是时间和资源成本高昂的操作。如果Databricks自动运行OPTIMIZE或等待分批写出数据,则将无法运行(以Delta表是源)低延迟的Delta-Lake流。许多客户的Delta表从未进行过优化,因为他们只从这些表流式传输数据,从而避免了优化所带来的查询好处。
最后,Delta Lake会自动收集有关写入表的文件(无论是否通过OPTIMIZE
操作)的统计信息。这意味着从Delta表的读取将利用此信息,无论该表或分区是否运行了OPTIMIZE
操作
我应该多久跑步一次OPTIMIZE?
当您选择运行OPTIMIZE的频率时,性能和成本之间就需要权衡取舍。 如果希望获得更好的最终用户查询性能,则应更频繁地运行 OPTIMIZE
(根据资源使用量,可能需要较高的成本)。如果要优化成本,应该减少运行频率。
运行 OPTIMIZE
(二进制打包和 Z 排序)的最佳实例类型是什么?
这两个操作都是执行大量 Parquet 解码和编码的 CPU 密集型操作。
对于这些工作负载,建议采用 F 或 Fsv2 系列。