表删除,更新和合并

说明

详细内容请参考Databricks官网文章:表删除,更新和合并

有关演示这些功能的Databricks笔记本,请参阅入门笔记本二

Delta Lake支持多个语句,以方便从Delta表中删除数据和更新数据。

从表中删除

从最新版本的Delta表中删除数据,但直到显示清除旧版本后才从物理存储中删除数据。例如,要删除2017年之前的所有事件,可以运行以下命令:

SQL

%sql
DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

Python

%pyspark
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

Scala

%spark
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits
说明

如果可能,请在分区的Delta表的分区列上提供谓词,因为这样的谓词可以显著加快操作速度。

有关详细信息,请参见API参考

更新表格

可以更新与Delta表中谓词匹配的数据。例如,要修复eventType中的拼写错误,可以运行以下命令:

SQL

%sql
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
更新表

Python

%pyspark
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

Scala

%spark
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

有关详细信息,请参见API参考

说明

与删除类似,在分区上使用谓词可以显著提高更新操作的速度。

使用合并操作在表中执行更新插入

您可以使用该合并操作将数据从源表,视图或DataFrame插入目标Delta表。此操作类似于SQL MERGE INTO命令,但对更新、插入和删除中删除操作有额外的支持。

假设您有一个Spark DataFrame,其中包含evenId的事件的新数据,其中一些事件可能已经存在于events表中,需要更新匹配的行(即even Id已经存在)并插入新行(即even Id不存在)。您可以运行以下命令:

SQL

%sql
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Python

%pyspark
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")
df = spark.createDataFrame([("update-case2", '2020-10-12', 2, 'INFO'),("case25", '2020-10-13', 25, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
updatesDF = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
# 合并表,eventId相等则更新数据,不相等就插入数据
deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()
合并表

Scala

%spark
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

操作语义

这是merge编程操作的详细说明。

可以有任意数量的whenMatched和whenNotMatched子句。

重要

在Databricks Runtime 7.2及更低版本中,merge最多可以有2个whenMatched子句,最多可以有1个whenNotMatched子句。

  • 当源行根据匹配条件与目标表行匹配时,将执行whenMatched子句。这些子句具有以下语义。

    • whenMatched子句最多只能有一个更新和一个删除操作。merge中的更新操作只更新匹配的目标行的指定列(类似于更新操作)。删除操作删除匹配的行。

    • 每个whenMatched子句都可以有一个可选条件。如果存在此子句条件,则仅当子句条件为true时,才对任何匹配的源-目标行对行执行更新或删除操作。

    • 如果有多个whenMatched子句,则按照指定的顺序(即,子句的顺序很重要)对它们进行求值。除最后一个外,所有whenMatched子句都必须有条件。

    • 如果两个whenMatched子句都有条件,并且对于匹配的源-目标行对,两个条件都不为true,则匹配的目标行保持不变。

    • 要用源数据集的相应列更新目标Delta表的所有列,请使用whenMatched(…).updateAll()。这相当于:

    Scala

    %spark
    whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

    对于目标Delta表的所有列。因此,此操作假定源表与目标表中的列相同,否则查询将引发分析错误。

    说明

    当启用自动架构迁移时,此行为将更改。有关详细信息,请参见自动模式演化。

  • 当源行与基于匹配条件的任何目标行都不匹配时,将执行whenNotMatched子句。这些子句具有以下语义。

    • whenNotMatched子句只能有insert操作。新行是根据指定的列和相应的表达式生成的。不需要指定目标表中的所有列。对于未指定的目标列,将插入NULL。

    说明

    在Databricks Runtime 6.5及更低版本中,您必须在目标表中提供该INSERT操作的所有列。

    • 每个whenNotMatched子句可以有一个可选条件。如果存在子句条件,则仅当源条件对该行为true时才插入该行。否则,将忽略源列。

    • 要将目标Delta表的所有列与源数据集的相应列一起插入,请使用whenNotMatched(...).insertAll()。这相当于:

    Scala

    %spark
    whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

    目标Delta表的所有列。因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。

    重要

    启用自动模式迁移后,此行为将更改。有关详细信息,请参见自动模式演变。

架构验证

merge自动验证通过插入和更新表达式生成的数据的架构是否与表的架构兼容。它使用以下规则来确定merge操作是否兼容:

  • 对于update和insert操作,目标Delta表中必须存在指定的目标列。

  • 对于updateAll和insertAll动作,源数据集必须具有目标Delta表的所有列。源数据集可以包含额外的列,它们将被忽略。

  • 对于所有操作,如果由生成目标列的表达式生成的数据类型与目标Delta表中的对应列不同,merge会尝试将其转换为表中的类型。

自动架构演变

重要

merge 中的架构演变在Databricks Runtime 6.6及更高版本中可用。

默认情况下,updateAll和insertAll使用来自源数据集的同名列来分配目标Delta表中的所有列。将忽略源数据集中与目标表中的列不匹配的任何列。但是,在某些用例中,需要自动将源列添加到目标Delta表中。要在使用updateAll和insertAll(至少其中一个)执行merge操作期间自动更新表架构,可以在运行merge操作之前设置Spark会话配置spark.databricks.delta.schema.autoMerge.enabled为true。

说明

  • 架构演变仅在同时存在一个updateAll或一个insertAll动作或两者同时存在时发生。

  • 在merge中的模式演化过程中,只有顶层列(即不是嵌套字段)会被更改。

  • update和insert操作不能显式引用目标表中不存在的目标列(即使其中有updateAll或insertAll作为子句之一)。请参阅下面的示例。

这里有几个例子说明了在模式演化和没有模式演化的情况下merge操作的效果。

查询(在Scala中)

没有架构演变的行为(默认值)

有架构演变行为

目标列: key, value源列: key, value, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

表架构保持不变;仅已更新/插入列key,value

表架构更改为(key,value,newValue)。updateAll更新列value和newValue,insertAll插入行(key、value、newValue)。

目标列: key, oldValue

源列: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

update和insertAll由于目标列oldValue不在源中,因此操作会引发错误。

表架构更改为(key,oldValue,newValue)。updateAll更新列key和newValue,而oldValue保持不变,insertAll插入行(key、NULL、newValue)(也就是说,oldValue作为NULL插入)。

目标列: key, oldValue

源列: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().update(Map(
    "newValue" -> col("s.newValue")))
  .whenNotMatched().insertAll()
  .execute()

update引发错误,因为目标表中不存在该列newValue。

update仍然会引发错误,因为目标表中不存在该列newValue。

目标列: key, oldValue

源列: key, newValue

targetDeltaTable.alias("t")
  .merge(
    sourceDataFrame.alias("s"),
    "t.key = s.key")
  .whenMatched().updateAll()
  .whenNotMatched().insert(Map(
    "key" -> col("s.key"),
    "newValue" -> col("s.newValue")))
  .execute()

insert引发错误,因为目标表中不存在该列newValue。

insert由于目标表中不存在该列newValue,因此仍然会引发错误。

性能调优

您可以使用以下方法减少merge所花费的时间:

  • 减少匹配项的搜索空间:默认情况下,merge操作搜索整个Delta表以在源表中查找匹配项。加速merge的一种方法是通过在匹配条件中添加已知约束来减少搜索空间。例如,假设您有一个按countrt/date分区的表,并且希望使用merge更新最后一天和特定国家/地区的信息。添加条件

    SQL

    %sql
    events.date = current_date() AND events.country = 'USA'

    将加快查询速度,因为它仅在相关分区中查找匹配项。此外,这还将减少与其他并发操作发生冲突的机会。有关更多详细信息,请参见并发控制。

  • 紧凑文件:如果数据存储在许多小文件中,则读取数据以搜索匹配项可能会变慢。您可以将小文件压缩为更大的文件,以提高读取吞吐量。有关详细信息,请参见压缩文件

  • 控制写入的无序分区:merge操作多次对数据进行随机排列以计算和写入更新的数据。用于随机的任务数由Spark会话配置spark.sql.shuffle.partitions控制。设置此参数不仅可以控制并发度,还可以确定输出文件的数量。增加该值会提高并发度,但也会生成大量较小的数据文件。

  • 启用优化写入:对于分区表,meage可以生成比随机分区数量多得多的小文件。这是因为每个随机任务都可以在多个分区中写入多个文件,并可能成为性能瓶颈。您可以通过启用优化写入来优化这一点。

合并范例

以下是一些有关如何merge在不同情况下使用的示例。

在这个部分:

  • 写入Delta表时的重复数据删除

  • 缓慢将数据(SCD)类型2操作更改为Delta表

  • 将更改数据写入Delta表

  • 使用Upsert 从流式处理查询foreachBatch

写入Delta表时的重复数据删除

一个常见的ETL用例是通过将日志附加到表中来将日志收集到Delta表中。但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。使用Merge,您可以避免插入重复的记录。

SQL

%sql
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

%pyspark
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

%spark
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
重要

包含新日志的数据集需要在其内部进行重复数据删除。通过合并的SQL语义,它将新数据与表中的现有数据进行匹配并删除重复数据,但是如果新数据集中存在重复数据,则将其插入。因此,在合并到表之前,对新数据进行重复数据删除。

如果您知道几天之内可能会得到重复的记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。

SQl

%sql
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

%pyspark
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

%pyspark
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()、

这比以前的命令效率更高,因为它仅在日志的最后7天而不是整个表中查找重复项。此外,您可以将此仅插入合并与结构化流一起使用,以执行日志的连续重复数据删除。

  • 在流查询中,可以使用foreachBatch中的merge操作将具有重复数据删除功能的所有流数据连续写入Delta表。

  • 在另一个流式查询中,可以连续从该Delta表中读取已删除重复的数据。这是可能的,因为insert-only merge只向Delta表追加新数据。

重要

insert-only merge被优化为仅在Databricks Runtime 6.2及更高版本中追加数据。在Databricks Runtime 6.1及更低版本中,insert-only merge操作的写入不能作为流读取。

缓慢将数据(SCD)Type2操作更改为Delta表

另一个常见的操作是SCD Type2,它维护维度表中每个键所做的所有更改的历史记录。这样的操作需要更新现有行以将key的以前值标记为旧值,并将新行作为最新值插入。给定一个包含更新的源表和包含维度数据的目标表,SCD Type2可以用merge表示。

下面是一个具体的例子,它维护客户的地址历史以及每个地址的活动日期范围。当客户的地址需要更新时,您必须将以前的地址标记为非当前地址,更新其活动日期范围,并将新地址添加为当前地址。

将更改数据写入Delta表

与SCD类似,另一个常见用例,通常称为变更数据捕获(CDC),是将从外部数据库生成的所有数据更改应用到Delta表中。换句话说,应用于外部表的更新、删除和插入集需要应用于Delta表。您可以使用merge执行以下操作。

使用Merge笔记本写入更改数据

Note链接地址:CDC using Merge(Scala)

使用Upsert 从流式处理查询foreachBath

您可以结合使用merge和foreachBatch(有关更多信息,请参阅foreachBatch)将流式处理查询中的复杂更新操作写入Delta表。例如:

  • 在更新模式下写入流式聚合:这比Complete Mode要高效得多。

  • 将数据库更改流写入Delta表:用于写入更改数据的合并查询可用于foreachBatch中,以连续地将更改流应用于Delta表。

  • 使用重复数据删除将流数据写入Delta表:用于重复数据删除的 insert-only merge 查询可以在 foreachBatch 中使用自动重复数据删除功能将数据(带有重复项)连续写入到 Delta 表中。

说明

  • 确保foreachBatch中的merge语句是等幂的,因为流查询的重新启动可以多次对同一批数据应用该操作。

  • 在foreachBatch中使用merge时,流式查询的输入数据速率(通过StreamingQueryProgress报告并在笔记本速率图中可见)可以报告为在源处生成数据的实际速率的倍数。这是因为merge多次读取输入数据,导致输入指标成倍增加。如果这是一个瓶颈,可以在merge之前缓存批处理数据帧,然后在merge后取消缓存。

使用merge和foreachBatch笔记本以更新模式编写流式聚合

Note链接地址:Upsert streaming aggregates using foreachBatch and Merge(Scala)