阿里云EMR Delta Lake提供了强大的数据处理能力,可以帮助您管理和操作数据,确保数据的质量和一致性。本文为您介绍EMR Delta Lake如何进行删除、更新与合并数据等操作。
DELETE
该命令用于删除数据。示例如下。
DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
使用DELETE
命令时,如果没有条件限制,则会删除所有数据。
暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询,例如:DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
如果您需要根据另一张表对目标表的匹配行进行删除(例如
DELETE FROM target WHERE target.col = ref.col ...
),请使用Merge语法。
UPDATE
该命令用于更新数据。示例如下。
UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.updateExpr( //使用SQL字符串。
"name = 'Robet'",
Map("name" -> "'Robert'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( //使用SQL函数和隐式转换。
col("name") === "Robet"),
Map("name" -> lit("Robert"));
暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询,例如:UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
如果要根据另一张表对目标表的匹配行进行更新(例如,
UPDATE target SET target.col = ref.col ...
或WHERE target.col = ref.col ...
),请使用Merge语法。
MERGE
该命令用于合并数据。示例如下。
MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, id, name]
DeltaTable.forPath(spark, "/tmp/delta_table")
.as("target")
.merge(updatesDF.as("source"), "target.id = source.id")
.whenMatched("target.name = 'should_update'")
.updateExpr(Map("target.name" -> "source.name"))
.whenMatched("target.name = 'should_delete'")
.delete()
.whenNotMatched("source.name = 'shoulde_insert'")
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
UPDATE子句和INSERT子句支持
*
语法,如果设置为UPDATE SET *
或者INSERT *
,则会更新或插入所有字段。暂不支持带有子查询的ON条件,但如果子查询为标量子查询的形式且使用SQL,可以设置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
为true
后进行查询。
ALTER TABLE
该命令用于更改现有表的结构和属性。支持对表进行以下操作:
ADD COLUMN:可以向表中添加新的列。
RENAME COLUMN(需要开启Column Mapping):可以将表中的列更改为新的名称。
DROP COLUMN(需要开启Column Mapping):可以从表中删除指定的列。
SET/UNSET TBLPROPERTIES:可以设置表级别的属性,如表的描述、表的存储格式等。
RENAME TO:将表的名称更改为新的名称。
对分区表执行ADD COLUMN操作,建议将新增字段追加到分区字段之前,以避免在查询引擎如Hive中查询Delta表时出现数据异常。
例如,在指定位置执行ADD COLUMN操作。
-- 假设delta_tbl表的Schema为(id IN, name STRING, pt STRING),其中pt为分区字段。
-- 新增new_col字段,并将其追加到name字段后,pt字段前。
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);
DESCRIBE HISTORY
该命令用于显示Delta Lake的详细操作历史。
按照顺序展示版本号、操作时间、用户ID、用户名、操作类型、操作参数、作业信息、Notebook信息、集群、操作基于的前置版本、隔离等级、是否直接追加和操作Metrics等信息。
通常大多数信息显示为Null。
示例如下:
显示所有的操作记录。
DESC HISTORY dbName.tableName;
显示最新一条的操作记录。
DESC HISTORY dbName.tableName limit 1;
CONVERT
该命令用于将Parquet格式的表转成Delta表。
CONVERT遍历指定路径下的Parquet数据文件,推测表的Schema,生成Delta表需要的元数据信息。如果Parquet表本身是分区表,则需要额外指定分区字段和类型。
示例如下:
转换指定路径下的Parquet数据文件。
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
转换指定路径下的Parquet数据文件,并按照dt和hour进行分区。
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);
使用CONVERT后,仅将表路径构建为Delta表所需的格式,尚未将其注册为表,需要继续使用CREATE TABLE命令。此时无需指定建表字段和分区字段。以下是具体示例。
CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";
OPTIMIZE
该命令通过合并小文件或ZOrder排序优化Delta表的数据布局,提升查询效率。OPTIMIZE命令支持如下操作:
针对分区表,可以通过指定分区来进行优化。
在进行正常Compact优化时,可以通过指定非分区字段进行ZOrder排序,以调整数据布局。
代码示例如下。
set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;
-- 对dbName.tableName表进行全局优化。
OPTIMIZE dbName.tableName;
-- 对2021-04-01之前的分区进行优化。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';
-- 对2021-04-01之前的分区进行优化,并使用col2, col3列进行排序。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
对于Streaming入湖场景,通常每个batch较小,会导致小文件较多,可以定期执行Optimize命令合并小文件。
对于查询模式相对固定的场景,例如,除分区字段外,仅指定几个列作为查询条件时,可以采用Zorder方式优化。
VACUUM
该命令可以删除表路径中不需要的,且超过指定时间的数据文件。
EMR的Delta Lake定义数据文件不需要包含以下两部分:
当前最新版本关联到的数据文件。
执行过Savepoint的特定版本关联到的数据文件。
VACUUM命令可以通过两种方式指定删除多久前的数据文件:
通过参数
delta.deletedFileRetentionDuration
配置表属性,默认值为1周。通过VACUUM命令指定,单位为小时。
语法
VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
示例
-- 删除数据文件。 VACUUM dbName.tableName; -- 删除24小时之前的数据文件。 VACUUM dbName.tableName RETAIN 24 HOURS; -- 显示待删除24小时之前的数据文件。 VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;
说明根据您创建表的实际情况,可以定期执行VACUUM命令,节省存储空间。
实际执行VACUUM命令前,可以先通过
DRY RUN
命令,确认删除内容。
SAVEPOINT
该命令可以永久保存Delta Lake的历史版本。
Delta Lake会在每次执行CheckPoint(固定版本间隔,由参数delta.checkpointInterval
决定)时清理掉log元数据文件(默认保留30天内的log元数据,由参数delta.logRetentionDuration
决定)。通过VACUUM也会删除历史版本不再需要的数据文件。执行SAVEPOINT命令,可以永久避免log元数据和数据文件被删除,同时配合time-travel的能力,可以读取历史版本数据。
示例如下:
保存ID为0的版本。
CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
说明/path/to/delta_tbl
为您实际的Delta表文件系统路径保存指定时间之前最近的版本。
CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
删除或查看SAVEPOINT操作记录。示例如下:
删除记录
--删除特定版本的数据。 DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0; --删除特定时间戳之前的数据。 DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
查看记录
可以显示SAVEPOINT的版本号、版本提交时间、SAVEPOINT时间及其他信息。
SHOW SAVEPOINT delta.`/path/to/delta_tbl`; SHOW SAVEPOINT dbName.tableName;
ROLLBACK
该命令可以恢复到Delta Lake某个历史版本。
如果指定要恢复到的历史版本不可重建(即缺失log元数据或者对应的数据文件),则抛出异常。示例如下:
回滚到ID为0的版本。
ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
回滚到指定时间之前最近的版本。
ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";