阿里云首页 Databricks数据洞察

并发控制

说明

详情请参考Databricks官网文章:并发控制

Delta Lake在读取和写入之间提供ACID事务保证。这意味着:

  • 跨多个集群的多个编写器可以同时修改表分区,并查看表的一致性快照视图,并且这些写入操作将具有序列顺序。

  • 即使在作业过程中修改了某个表,读取器仍会继续查看Databricks 作业开始使用的表的一致快照视图。

乐观并发控制

Delta Lake使用乐观并发控制 来提供两次写入之间的事务保证。在这种机制下,写操作分为三个阶段:

  1. 读取:读取(如果需要)表的最新可用版本,以标识需要修改(即重写)的文件。

  2. 写入:通过写入新数据文件来暂存所有更改

  3. 验证并提交:在提交更改之前,检查建议的更改是否与自读取快照后并发提交的任何其他更改冲突。如果没有冲突,则所有已转移的更改都将作为新版本的快照提交,并且成功写入操作。但是,如果存在冲突,则写入操作失败,并出现并发修改异常,而不是像对Parquet表执行写操作那样损坏表。

表的隔离级别定义了必须将事务与并发操作所做的修改隔离的程度。有关在Databricks上的Delta Lake支持的隔离级别的信息,请参阅隔离级别

写冲突

下表描述了在每个隔离级别中哪些写操作对可能发生冲突。

插入

更新,删除,合并

平台

插入

不能冲突

更新,删除,合并

在可序列化时可能会冲突,WriteSerializable 中不能发生冲突

在可序列化和 WriteSerializable 中可能发生冲突

平台

不能冲突

在可序列化和 WriteSerializable 中可能发生冲突

在可序列化和 WriteSerializable 中可能发生冲突

使用分区和非连续命令条件来避免冲突

在所有标记为“可能冲突”的情况下,这两个操作是否会发生冲突取决于它们是否对同一组文件进行操作。 可以通过将表分区为与操作条件中使用的列相同的列来使两组文件不相交。 例如, UPDATE table WHERE date > '2010-01-01' ... DELETE table WHERE date < '2010-01-01' 如果表未按日期分区,则这两个命令和将发生冲突,因为这两个命令都可以尝试修改相同的一组文件。 对表进行分区 date 将避免冲突。 因此,根据通常用于命令的条件对表进行分区可以显著减少冲突。 不过,按基数较高的列对表进行分区可能会导致由大量子目录引起的其他性能问题。

冲突例外

发生事务冲突时,您将观察到以下异常之一:

  • ConcurrentAppendException

  • ConcurrentDeleteReadException

  • ConcurrentDeleteDeleteException

  • MetadataChangedException

  • ConcurrentTransactionException

  • ProtocolChangedException

ConcurrentAppendException

当并发操作在操作读取的同一分区(或未分区表中的任何位置)中添加文件时,会发生异常。该文件增加的部分可以由插入、删除、更新或合并操作引起。

使用默认隔离级别为WriteSerializable,盲 INSERT操作(即,在未读取任何数据的情况下盲目追加数据)添加的文件不会与任何操作冲突,即使它们接触相同的分区(或未分区表中的任何位置)也是如此。如果隔离级别设置为Serializable,则盲追加加可能会冲突。

DELETE,UPDATE或MERGE并发操作经常引发异常。尽管并发操作可能会物理上更新不同的分区目录,但其中一个操作可能会读取与其他分区目录同时更新的同一分区,从而导致冲突。可以通过在操作条件中进行分隔显式来避免这种情况。 请考虑以下示例。

Scala

%spark
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

假设您在不同的日期或国家/地区同时运行上述代码。由于每个作业都在目标Delta表上的独立分区上运行,因此您不会遇到任何冲突。但是,该条件不够明确,可以扫描整个表,并且可能与更新任何其他分区的并发操作冲突。相反,您可以重写语句以将特定日期和国家/地区添加到合并条件中,如以下示例所示。

Scala

%spark
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

现在可以安全地在不同日期和国家/地区同时运行此操作。

ConcurrentDeleteReadException

当并发操作删除您的操作读取的文件时,会发生此异常。常见的原因是DELETE,UPDATE或MERGE操作时重写文件。

ConcurrentDeleteDeleteException

当并发操作删除了操作还删除的文件时,会发生此异常。 这可能是由两个并发 操作重写相同文件引起的。

MetadataChangedException

当并发事务更新增量表的元数据时,将发生此异常。 常见的原因是 ALTER TABLE 操作或写入Delta表,用于更新表的架构

ConcurrentTransactionException

如果使用相同检查点位置的流式处理查询同时启动多次,并尝试同时写入Delta表。 永远不应让两个流式处理查询使用相同的检查点位置并同时运行。

ProtocolChangedException

当您的Delta表升级到新版本时,就会发生这种情况。为了使将来的操作成功,您可能需要升级Delta Lake版本。