为了提高查询速度,阿里云Databricks上的Delta Lake支持优化存储在云存储中的数据布局的功能。Databricks上的Delta Lake支持两种布局算法:bin-packing和Z-Ordering。
详细内容可参考Databricks官网文章:通过文件管理优化性能
本文介绍如何运行优化命令,两种布局算法工作原理以及如何清除陈旧的表快照。
该FAQ解释了为什么优化是不是自动的,包括如何运行优化命令的建议。
对于演示优化优势的笔记本,请参阅优化示例。
Databricks Runtime 7.x:优化(Databricks上的Delta Lake)
Databricks运行时5.5 LTS和6.x:优化(Databricks上的Delta Lake)
压缩 (bin-packing)
Databricks 上的 Delta Lake 可以将小文件合并为较大的文件,从而提高表中读取查询的速度。 通过运行 OPTIMIZE
命令触发压缩:
SQL
OPTIMIZE delta.`/data/events`
or
OPTIMIZE events
如果您有大量数据,并且只想优化其中的一个子集,则可以使用WHERE指定可选的分区谓词:
OPTIMIZE events WHERE date >= '2017-01-01'
Bin-packing是幂等的,这意味着如果在同一个数据集上运行两次,第二次运行没有效果。
Bin-packing的目标是根据磁盘大小生成均衡的数据文件,但不一定是每个文件的元组数。然而,这两个度量值通常是相互关联的。
Delta表的读取器使用快照隔离,这意味着当OPTIMIZE从事务日志中删除不必要的文件时,它们不会被中断。OPTIMIZE 不会对表进行任何数据相关更改,因此,在 OPTIMIZE 之前和之后读取都具有相同的结果。 对作为流式处理源的表执行 OPTIMIZE 不会影响将此表视为源的任何当前或未来的流。OPTIMIZE操作返回删除的文件和添加的文件的文件统计信息(最小值、最大值、总计等)。Optimize stats还包含Z-Ordering统计信息、批处理数量和优化的分区。
在Databricks Runtime 6.0及更高版本中可用。
您还可以使用“自动优化”自动压缩小文件。
Data skipping
将数据写入Delta表时,将自动收集data skipping信息。Databricks上的Delta Lake在查询时利用此信息(最小值和最大值)来提供更快的查询。您无需配置data skipping。该功能会在适用时激活。但是,其有效性取决于数据的布局。为了获得最佳效果,请应用Z-Ordering。
有关Delta Lake在Databricks data skipping和Z-Ordering方面的优点的示例,请参阅“优化示例”中的notebook。默认情况下,Databricks上的Delta Lake会收集有关表架构中定义的前32列的统计信息。您可以使用table属性dataSkippingNumIndexedCols更改此值。添加更多列来收集统计信息将在编写文件时增加额外的开销。
收集长字符串的统计信息成本高昂。若要避免收集有关长字符串的统计信息,可以将表属性datakippingnumindexedcols配置为避免包含长字符串的列,也可以使用ALTER table CHANGE coln将包含长字符串的列移动到大于dataskipingnumindexedcols的列。为了收集统计信息,嵌套列中的每个字段都被视为一个单独的列。
Z-Ordering (多维聚类)
Z-Ordering 是并置同一组文件中相关信息的方法。Databricks 上的Delta Lake data skipping算法会自动使用了这种并置,从而大大减少了需要读取的数据量。从而显著减少需要读取的数据量。对于Z-Order数据,可以在ZORDER-BY子句中指定要排序的列:
SQL
OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)
如果您希望在查询predicates中经常使用一个列,并且该列具有较高的基数(即,有大量不同的值),那么请使用zorderby。
可以将ZORDER BY的多个列指定为逗号分隔的列表。然而,每增加一列,局部性的有效性就会下降。对没有收集到统计信息的列进行Z-Ordering将是无效的,并且由于数据跳过需要列本地统计信息(如min、max和count),因此会浪费资源。通过对架构中的列重新排序或增加要收集统计信息的列数,可以在某些列上配置统计信息收集。有关详细信息,请参阅数据跳过部分。
Z-Ordering不是幂等的,而是一种增量操作。多次运行间不能保证Z-Ordering所需的时间减少。但是,如果没有新的数据添加到一个只有Z-Ordering的分区中,那么该分区的另一个Z-Ordering将不会有任何效果。
Z-Ordering旨在根据元组的数量生成均匀平衡的数据文件,但不一定是磁盘上的数据大小。这两个度量值通常是相互关联的,但也有可能出现这种情况,导致优化任务时间出现偏差。
例如,如果您按日期排序,并且您最近的记录都比过去的记录宽得多(例如,数组或字符串值更长),则OPTIMIZE作业的任务持续时间和生成的文件大小都会出现偏差。但是,这只是OPTIMIZE命令本身的问题;它不应对后续查询产生任何负面影响。
Notebooks
有关优化的好处的示例,请参阅以下Notebook:
优化实例
提高交互式查询性能
Delta Engine提供了一些其他机制来提高查询性能。
管理数据实效性
在每个查询开始时,Delta表会自动更新到表的最新版本。当命令状态报告: Updating the Delta table's state时,可以在笔记本中观察到这个过程。但是,在表上运行历史分析时,您可能不需要最新的数据,特别是在频繁引入流式处理数据的表中。 在这些情况下,可以在 Delta 表的过时快照上运行查询。 这会降低从查询获取结果的延迟时间。
可以通过将 Spark 会话配置 spark.databricks.delta.stalenessLimit 设置为时间字符串值(例如 1h、15m、1d 分别为 1 小时、15 分钟和 1 天)来配置表数据的过时程度。此配置是特定session,因此不会影响其他用户从其他笔记本、作业或BI工具访问此表。另外,此设置不会更新表。它只会阻止查询等待表更新。该更新仍在后台进行,并将在整个集群之间公平地共享资源。如果超过过期限制,则查询将在表状态更新上阻止。
用于低延迟查询的增强检查点
Delta Lake 写入检查点作为 Delta 表的聚合状态,每 10 次提交写入一次。这些检查点用作计算表的最新状态的起点。如果没有检查点 ,Delta Lake将不得不读取大量的JSON文件(“Delta”文件),表示提交到事务日志以计算表的状态。此外,此外,列级统计信息 Delta Lake 用于执行存储在检查点中的数据跳过操作。
Delta Lake 检查点与结构化流checkpoints不同。
在Databricks Runtime 7.2及更低版本中,列级别的统计信息作为JSON列存储在Delta Lake 检查点中。在Databricks Runtime 7.3LTS及更高版本中,列级别的统计信息存储作为结构。结构格式使Delta Lake读取速度更快,因为:
Delta Lake不会执行昂贵的JSON解析来获取列级统计信息。
Parquet 列修剪功能可以显著减少读取列的统计信息所需的 I/O
结构格式可启用一系列优化,这些优化可将Delta Lake读取操作的开销从几秒降低到数十毫秒,从而显着减少短查询的延迟。
管理检查点中的列级统计信息 您可以使用表属性delta.checkpoint.writestatsassjson以及delta.checkpoint.writeStatsassTrust.管理如何在检查点中写入统计信息。如果两个表属性都为false,则Delta-Lake无法执行跳过数据。
在Databricks Runtime 7.3 LTS及更高版本中:
批量写入JSON和结构格式编写写入统计信息。delta.checkpoint.writeStatsAsJson默认值为ture。
流式处理只以JSON格式写入统计信息(以最大程度地减少检查点对写入延迟的影响)。若要同时编写结构格式,请参见为结构化流式查询启用增强的检查点。
在这两种情况下,都默认未定义 delta.checkpoint.writeStatsAsStruct。
读取器在可用时使用结构列,否则退回到使用JSON列。
在Databricks运行时7.2及以下版本中,读者只使用JSON列。因此,如果delta.checkpoint.writestatsassjson为false,此类读取器无法执行跳过数据。
增强的检查点不会破坏与开源Delta-Lake 读取器的兼容性。但是,设置delta.checkpoint.writestatsassjson为false可能会影响到Delta Lake的专有读取器。请与您的供应商联系以了解有关性能影响的更多信息。
检查点中统计信息的权衡
由于在检查点中编写统计信息会产生成本(即使对于大型表,通常也要不到一分钟),因此需要权衡在编写检查点所花费的时间与与Databricks Runtime 7.2及更低版本的兼容性。如果您能够将所有工作负载升级到Databricks Runtime 7.3 LTS或更高版本,则可以通过禁用旧版JSON统计信息来降低编写checkpoints的成本。下表总结了这一折衷方案。
如果跳过数据不适用于你的应用程序,则可以将两个属性都设置为false,并且不收集或写入任何统计信息。我们不建议这种配置。
writeStatsAsStruct | |||
false | true | ||
writeStatsAsJson | false |
|
|
true |
|
|
为结构化流查询启用增强的checkpoints
如果您的结构化流工作负载没有低延迟要求(要求延迟在一分钟以内),则可以通过运行以下SQL命令来启用增强检查点:
SQL
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')
如果您不使用Databricks Runtime 7.2或更低版本来查询数据,则还可以通过设置以下表属性来改善检查点写入延迟:
SQL
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.checkpoint.writeStatsAsJson' = 'false'
)
禁止从没有统计结构的检查点的集群中写入
Databricks Runtime 7.2及更低版本中的编写器会写入无统计结构检查点,从而妨碍了对Databricks Runtime 7.3 LTS阅读器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用以下upgradeTableProtocol方法升级Delta表:
Python
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表,并且此更改是不可逆的。我们建议仅在您采用新格式后才升级表。您可以通过使用Databricks Runtime 7.3为表创建浅表克隆来尝试这些优化。
升级表写入程序版本后,写入程序必须遵守'delta.checkpoint.writeStatsassTrust'和'delta.checkpoint.writestatsassjson'.的设置
下表总结了如何在不同版本的Databricks Runtime、表协议版本和编写器类型中利用增强的检查点。
Without Protocol Upgrade | With Protocol Upgrade | |||||
Databricks Runtime 7.2及以下编写器 | Databricks Runtime 7.3及更高版本的批处理编写器 | Databricks Runtime 7.3及更高版本的流编写器 | Databricks Runtime 7.2及以下编写器 | Databricks Runtime 7.3及更高版本的批处理编写器 | Databricks Runtime 7.3及更高版本的流编写器 | |
Databricks Runtime 7.2及以下读取器性能 | 没有得到改进 | 没有得到改进 | 没有得到改进 | 不能使用编写器 | 没有得到改进 | 没有得到改进 |
Databricks Runtime 7.3及更高版本的读取器性能 | 没有得到改进 | 默认情况下改进 | 通过表格属性选择Opt-in(1) | 不能使用编写器 | 默认情况下改进 | 通过表格属性选择Opt-in(1) |
(1)设置表属性'delta.checkpoint.writeStatsAsStruct' = 'true'
禁用使用旧检查点格式的从集群中写入 Databricks Runtime 7.2及更低版本的编写器可以编写旧格式的检查点,这将妨碍对Databricks Runtime 7.3编写器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用upgradeTableProtocol方法升级Delta表:
Python
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
Scala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表。这种更改是不可逆的。因此,我们建议仅在提交新格式后才升级表。您可以通过使用Databricks Runtime 7.3创建表的浅表克隆来尝试这些优化:
Databricks Runtime 7.x:CLONE(Databricks上的Delta Lake)
Databricks运行时5.5 LTS和6.x:CLONE(Databricks上的Delta Lake)
常见问题(FAQ)
为什么OPTIMIZE不是自动的?
OPTIMIZE操作启动了多个Spark作业,以便通过压缩来优化文件大小(并可以选择执行 Z-Ordering)。由于OPTIMIZE执行的内容大部分是压缩小文件,因此您必须先累积许多小文件,然后此操作才能生效。因此,该OPTIMIZE操作不会自动运行。
此外,运行 OPTIMIZE(特别是 ZORDER)是时间和资源成本高昂的操作。如果Databricks自动运行OPTIMIZE或等待分批写出数据,则将无法运行(以Delta表是源)低延迟的Delta-Lake流。许多客户的Delta表从未进行过优化,因为他们只从这些表流式传输数据,从而避免了优化所带来的查询好处。
最后,Delta Lake会自动收集有关写入表的文件(无论是否通过OPTIMIZE
操作)的统计信息。这意味着从Delta表的读取将利用此信息,无论该表或分区是否运行了OPTIMIZE
操作
我应该多久跑步一次OPTIMIZE?
当您选择运行OPTIMUZE的频率时,性能和成本之间就需要权衡取舍。 如果希望获得更好的最终用户查询性能,则应更频繁地运行 OPTIMIZE
(根据资源使用量,可能需要较高的成本)。如果要优化成本,应该减少运行频率。
运行 OPTIMIZE
(二进制打包和 Z 排序)的最佳实例类型是什么?
这两个操作都是执行大量 Parquet 解码和编码的 CPU 密集型操作。
对于这些工作负载,建议采用 F 或 Fsv2 系列。
在文档使用中是否遇到以下问题
更多建议
匿名提交