本文介绍如何使用EMR-Delta来删除、更新与合并数据。
删除数据
- Scala
import io.delta.tables. val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table") deltaTable.delete("date < '2019-11-11'") import org.apache.spark.sql.functions. import spark.implicits. deltaTable.delete(col("date") < "2019-11-11")
- SQL
DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
- 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询,例如:DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
- 如果您需要根据另一张表对目标表的匹配行进行删除(例如
DELETE FROM target WHERE target.col = ref.col ...
),请使用Merge语法。
说明 使用DELETE
命令,如果没有条件限制,则会删除所有数据。 - 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
更新数据
- Scala
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table") deltaTable.updateExpr( //使用SQL字符串。 "name = 'Robet'", Map("name" -> "'Robert'") import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.update( //使用SQL函数和隐式转换。 col("name") === "Robet"), Map("name" -> lit("Robert"));
- SQL
UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
- 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询,例如:UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
- 如果要根据另一张表对目标表的匹配行进行更新(例如
UPDATE target SET target.col = ref.col ... 或 WHERE target.col = ref.col ...
),请使用Merge语法。
- 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
合并数据
- Scala
import io.delta.tables._ import org.apache.spark.sql.functions._ val updatesDF = ... // define the updates DataFrame[date, id, name] DeltaTable.forPath(spark, "/tmp/delta_table") .as("target") .merge(updatesDF.as("source"), "target.id = source.id") .whenMatched("target.name = 'should_update'") .updateExpr(Map("target.name" -> "source.name")) .whenMatched("target.name = 'should_delete'") .delete() .whenNotMatched("source.name = 'shoulde_insert'") .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()
- SQL
MERGE INTO target AS t USING source AS s ON t.date = s.date WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
- UPDATE子句和INSERT子句支持
*
语法:如果设置为UPDATE SET *
或者INSERT *
,则会更新或插入所有字段。 - 暂不支持带有子查询的ON条件,但如果子查询为标量子查询的形式且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询。
- UPDATE子句和INSERT子句支持
在文档使用中是否遇到以下问题
更多建议
匿名提交