全部产品

通过文件管理优化性能

为了提高查询速度,阿里云Databricks上的Delta Lake支持优化存储在云存储中的数据布局的功能。Databricks上的Delta Lake支持两种布局算法:bin-packing和Z-Ordering。

说明

详细内容可参考Databricks官网文章:通过文件管理优化性能

  • 本文介绍如何运行优化命令,两种布局算法工作原理以及如何清除陈旧的表快照。

  • 该FAQ解释了为什么优化是不是自动的,包括如何运行优化命令的建议。

  • 对于演示优化优势的笔记本,请参阅优化示例。

压缩 (bin-packing)

Databricks 上的 Delta Lake 可以将小文件合并为较大的文件,从而提高表中读取查询的速度。 通过运行 OPTIMIZE 命令触发压缩:

SQL

OPTIMIZE delta.`/data/events`

or

OPTIMIZE events

如果您有大量数据,并且只想优化其中的一个子集,则可以使用WHERE指定可选的分区谓词:

OPTIMIZE events WHERE date >= '2017-01-01'
说明

Bin-packing是幂等的,这意味着如果在同一个数据集上运行两次,第二次运行没有效果。

Bin-packing的目标是根据磁盘大小生成均衡的数据文件,但不一定是每个文件的元组数。然而,这两个度量值通常是相互关联的。

Delta表的读取器使用快照隔离,这意味着当OPTIMIZE从事务日志中删除不必要的文件时,它们不会被中断。OPTIMIZE 不会对表进行任何数据相关更改,因此,在 OPTIMIZE 之前和之后读取都具有相同的结果。 对作为流式处理源的表执行 OPTIMIZE 不会影响将此表视为源的任何当前或未来的流。OPTIMIZE操作返回删除的文件和添加的文件的文件统计信息(最小值、最大值、总计等)。Optimize stats还包含Z-Ordering统计信息、批处理数量和优化的分区。

注意

在Databricks Runtime 6.0及更高版本中可用。

您还可以使用“自动优化”自动压缩小文件。

Data skipping

将数据写入Delta表时,将自动收集data skipping信息。Databricks上的Delta Lake在查询时利用此信息(最小值和最大值)来提供更快的查询。您无需配置data skipping。该功能会在适用时激活。但是,其有效性取决于数据的布局。为了获得最佳效果,请应用Z-Ordering。

有关Delta Lake在Databricks data skipping和Z-Ordering方面的优点的示例,请参阅“优化示例”中的notebook。默认情况下,Databricks上的Delta Lake会收集有关表架构中定义的前32列的统计信息。您可以使用table属性dataSkippingNumIndexedCols更改此值。添加更多列来收集统计信息将在编写文件时增加额外的开销。

收集长字符串的统计信息成本高昂。若要避免收集有关长字符串的统计信息,可以将表属性datakippingnumindexedcols配置为避免包含长字符串的列,也可以使用ALTER table CHANGE coln将包含长字符串的列移动到大于dataskipingnumindexedcols的列。为了收集统计信息,嵌套列中的每个字段都被视为一个单独的列。

Z-Ordering (多维聚类)

Z-Ordering 是并置同一组文件中相关信息的方法。Databricks 上的Delta Lake data skipping算法会自动使用了这种并置,从而大大减少了需要读取的数据量。从而显著减少需要读取的数据量。对于Z-Order数据,可以在ZORDER-BY子句中指定要排序的列:

SQL

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

如果您希望在查询predicates中经常使用一个列,并且该列具有较高的基数(即,有大量不同的值),那么请使用zorderby。

可以将ZORDER BY的多个列指定为逗号分隔的列表。然而,每增加一列,局部性的有效性就会下降。对没有收集到统计信息的列进行Z-Ordering将是无效的,并且由于数据跳过需要列本地统计信息(如min、max和count),因此会浪费资源。通过对架构中的列重新排序或增加要收集统计信息的列数,可以在某些列上配置统计信息收集。有关详细信息,请参阅数据跳过部分。

注意

  • Z-Ordering不是幂等的,而是一种增量操作。多次运行间不能保证Z-Ordering所需的时间减少。但是,如果没有新的数据添加到一个只有Z-Ordering的分区中,那么该分区的另一个Z-Ordering将不会有任何效果。

  • Z-Ordering旨在根据元组的数量生成均匀平衡的数据文件,但不一定是磁盘上的数据大小。这两个度量值通常是相互关联的,但也有可能出现这种情况,导致优化任务时间出现偏差。

    例如,如果您按日期排序,并且您最近的记录都比过去的记录宽得多(例如,数组或字符串值更长),则OPTIMIZE作业的任务持续时间和生成的文件大小都会出现偏差。但是,这只是OPTIMIZE命令本身的问题;它不应对后续查询产生任何负面影响。

Notebooks

有关优化的好处的示例,请参阅以下Notebook:

优化实例

提高交互式查询性能

Delta Engine提供了一些其他机制来提高查询性能。

管理数据实效性

在每个查询开始时,Delta表会自动更新到表的最新版本。当命令状态报告: Updating the Delta table's state时,可以在笔记本中观察到这个过程。但是,在表上运行历史分析时,您可能不需要最新的数据,特别是在频繁引入流式处理数据的表中。 在这些情况下,可以在 Delta 表的过时快照上运行查询。 这会降低从查询获取结果的延迟时间。

可以通过将 Spark 会话配置 spark.databricks.delta.stalenessLimit 设置为时间字符串值(例如 1h、15m、1d 分别为 1 小时、15 分钟和 1 天)来配置表数据的过时程度。此配置是特定session,因此不会影响其他用户从其他笔记本、作业或BI工具访问此表。另外,此设置不会更新表。它只会阻止查询等待表更新。该更新仍在后台进行,并将在整个集群之间公平地共享资源。如果超过过期限制,则查询将在表状态更新上阻止。

用于低延迟查询的增强检查点

Delta Lake 写入检查点作为 Delta 表的聚合状态,每 10 次提交写入一次。这些检查点用作计算表的最新状态的起点。如果没有检查点 ,Delta Lake将不得不读取大量的JSON文件(“Delta”文件),表示提交到事务日志以计算表的状态。此外,此外,列级统计信息 Delta Lake 用于执行存储在检查点中的数据跳过操作。

警告

Delta Lake 检查点与结构化流checkpoints不同。

在Databricks Runtime 7.2及更低版本中,列级别的统计信息作为JSON列存储在Delta Lake 检查点中。在Databricks Runtime 7.3LTS及更高版本中,列级别的统计信息存储作为结构。结构格式使Delta Lake读取速度更快,因为:

  • Delta Lake不会执行昂贵的JSON解析来获取列级统计信息。

  • Parquet 列修剪功能可以显著减少读取列的统计信息所需的 I/O

结构格式可启用一系列优化,这些优化可将Delta Lake读取操作的开销从几秒降低到数十毫秒,从而显着减少短查询的延迟。

管理检查点中的列级统计信息 您可以使用表属性delta.checkpoint.writestatsassjson以及delta.checkpoint.writeStatsassTrust.管理如何在检查点中写入统计信息。如果两个表属性都为false,则Delta-Lake无法执行跳过数据。

在Databricks Runtime 7.3 LTS及更高版本中:

  • 批量写入JSON和结构格式编写写入统计信息。delta.checkpoint.writeStatsAsJson默认值为ture。

  • 流式处理只以JSON格式写入统计信息(以最大程度地减少检查点对写入延迟的影响)。若要同时编写结构格式,请参见为结构化流式查询启用增强的检查点。

  • 在这两种情况下,都默认未定义 delta.checkpoint.writeStatsAsStruct。

  • 读取器在可用时使用结构列,否则退回到使用JSON列。

在Databricks运行时7.2及以下版本中,读者只使用JSON列。因此,如果delta.checkpoint.writestatsassjson为false,此类读取器无法执行跳过数据。

警告

增强的检查点不会破坏与开源Delta-Lake 读取器的兼容性。但是,设置delta.checkpoint.writestatsassjson为false可能会影响到Delta Lake的专有读取器。请与您的供应商联系以了解有关性能影响的更多信息。

检查点中统计信息的权衡

由于在检查点中编写统计信息会产生成本(即使对于大型表,通常也要不到一分钟),因此需要权衡在编写检查点所花费的时间与与Databricks Runtime 7.2及更低版本的兼容性。如果您能够将所有工作负载升级到Databricks Runtime 7.3 LTS或更高版本,则可以通过禁用旧版JSON统计信息来降低编写checkpoints的成本。下表总结了这一折衷方案。

如果跳过数据不适用于你的应用程序,则可以将两个属性都设置为false,并且不收集或写入任何统计信息。我们不建议这种配置。

writeStatsAsStruct

false

true

writeStatsAsJson

false

  • 没有数据skipping

  • 在Databricks Runtime 7.3及更高版本上的查询更快

  • checkpoints稍慢

  • 在Databricks Runtime 7.2及更低版本的阅读器中,没有数据skipping

true

  • Databricks运行时7.2及以下

  • 较慢的查询

  • 在Databricks Runtime 7.3及更高版本上的查询更快

  • 保持与Databricks Runtime 7.2及更低版本上的阅读器的兼容性

  • checkpoints的延迟最高(秒级)

为结构化流查询启用增强的checkpoints

如果您的结构化流工作负载没有低延迟要求(要求延迟在一分钟以内),则可以通过运行以下SQL命令来启用增强检查点:

SQL

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

如果您不使用Databricks Runtime 7.2或更低版本来查询数据,则还可以通过设置以下表属性来改善检查点写入延迟:

SQL

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

禁止从没有统计结构的检查点的集群中写入

Databricks Runtime 7.2及更低版本中的编写器会写入无统计结构检查点,从而妨碍了对Databricks Runtime 7.3 LTS阅读器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用以下upgradeTableProtocol方法升级Delta表:

Python

from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Scala

import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
警告

应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表,并且此更改是不可逆的。我们建议仅在您采用新格式后才升级表。您可以通过使用Databricks Runtime 7.3为表创建浅表克隆来尝试这些优化。

升级表写入程序版本后,写入程序必须遵守'delta.checkpoint.writeStatsassTrust'和'delta.checkpoint.writestatsassjson'.的设置

下表总结了如何在不同版本的Databricks Runtime、表协议版本和编写器类型中利用增强的检查点。

Without Protocol Upgrade

With Protocol Upgrade

Databricks Runtime 7.2及以下编写器

Databricks Runtime 7.3及更高版本的批处理编写器

Databricks Runtime 7.3及更高版本的流编写器

Databricks Runtime 7.2及以下编写器

Databricks Runtime 7.3及更高版本的批处理编写器

Databricks Runtime 7.3及更高版本的流编写器

Databricks Runtime 7.2及以下读取器性能

没有得到改进

没有得到改进

没有得到改进

不能使用编写器

没有得到改进

没有得到改进

Databricks Runtime 7.3及更高版本的读取器性能

没有得到改进

默认情况下改进

通过表格属性选择Opt-in(1)

不能使用编写器

默认情况下改进

通过表格属性选择Opt-in(1)

(1)设置表属性'delta.checkpoint.writeStatsAsStruct' = 'true'

禁用使用旧检查点格式的从集群中写入 Databricks Runtime 7.2及更低版本的编写器可以编写旧格式的检查点,这将妨碍对Databricks Runtime 7.3编写器的优化。要阻止运行Databricks Runtime 7.2及更低版本的集群写入Delta表,可以使用upgradeTableProtocol方法升级Delta表:

Python

from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Scala

import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
注意

应用该upgradeTableProtocol方法可防止运行Databricks Runtime 7.2及更低版本的集群写入表。这种更改是不可逆的。因此,我们建议仅在提交新格式后才升级表。您可以通过使用Databricks Runtime 7.3创建表的浅表克隆来尝试这些优化:

常见问题(FAQ)

为什么OPTIMIZE不是自动的?

OPTIMIZE操作启动了多个Spark作业,以便通过压缩来优化文件大小(并可以选择执行 Z-Ordering)。由于OPTIMIZE执行的内容大部分是压缩小文件,因此您必须先累积许多小文件,然后此操作才能生效。因此,该OPTIMIZE操作不会自动运行。

此外,运行 OPTIMIZE(特别是 ZORDER)是时间和资源成本高昂的操作。如果Databricks自动运行OPTIMIZE或等待分批写出数据,则将无法运行(以Delta表是源)低延迟的Delta-Lake流。许多客户的Delta表从未进行过优化,因为他们只从这些表流式传输数据,从而避免了优化所带来的查询好处。

最后,Delta Lake会自动收集有关写入表的文件(无论是否通过OPTIMIZE操作)的统计信息。这意味着从Delta表的读取将利用此信息,无论该表或分区是否运行了OPTIMIZE操作

我应该多久跑步一次OPTIMIZE?

当您选择运行OPTIMUZE的频率时,性能和成本之间就需要权衡取舍。 如果希望获得更好的最终用户查询性能,则应更频繁地运行 OPTIMIZE(根据资源使用量,可能需要较高的成本)。如果要优化成本,应该减少运行频率。

运行 OPTIMIZE(二进制打包和 Z 排序)的最佳实例类型是什么?

这两个操作都是执行大量 Parquet 解码和编码的 CPU 密集型操作。

对于这些工作负载,建议采用 F 或 Fsv2 系列。