本文介绍如何使用EMR-Delta来删除、更新与合并数据。

删除数据

  • Scala
    import io.delta.tables.
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    deltaTable.delete("date < '2019-11-11'")
    import org.apache.spark.sql.functions.
    import spark.implicits.
    deltaTable.delete(col("date") < "2019-11-11")
  • SQL
    DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
    • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:
      DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • 如果您需要根据另一张表对目标表的匹配行进行删除(例如DELETE FROM target WHERE target.col = ref.col ...),请使用Merge语法。
    说明 使用DELETE命令,如果没有条件限制,则会删除所有数据。

更新数据

  • Scala
    import io.delta.tables._
    
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    
    deltaTable.updateExpr(            //使用SQL字符串。
      "name = 'Robet'",
      Map("name" -> "'Robert'")
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    deltaTable.update(                //使用SQL函数和隐式转换。
      col("name") === "Robet"),
      Map("name" -> lit("Robert"));
  • SQL
    UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
    • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:
      UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • 如果要根据另一张表对目标表的匹配行进行更新(例如UPDATE target SET target.col = ref.col ... 或 WHERE target.col = ref.col ...),请使用Merge语法。

合并数据

  • Scala
    import io.delta.tables._
    import org.apache.spark.sql.functions._
    
    val updatesDF = ...  // define the updates DataFrame[date, id, name]
    
    DeltaTable.forPath(spark, "/tmp/delta_table")
      .as("target")
      .merge(updatesDF.as("source"), "target.id = source.id")
      .whenMatched("target.name = 'should_update'")
      .updateExpr(Map("target.name" -> "source.name"))
      .whenMatched("target.name = 'should_delete'")
      .delete()
      .whenNotMatched("source.name = 'shoulde_insert'")
      .insertExpr(
        Map(
          "date" -> "updates.date",
          "eventId" -> "updates.eventId",
          "data" -> "updates.data"))
      .execute()
  • SQL
    MERGE INTO target AS t
    USING source AS s
    ON t.date = s.date
    WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
    WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
    WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
    • UPDATE子句和INSERT子句支持*语法:如果设置为UPDATE SET *或者INSERT *,则会更新或插入所有字段。
    • 暂不支持带有子查询的ON条件,但如果子查询为标量子查询的形式且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询。