本文为您介绍E-MapReduce(简称EMR)上的DeltaLake如何管理数据,包括删除、更新与合并数据等操作。

背景信息

EMR的DeltaLake支持以下操作管理数据。
功能 描述
删除数据 删除数据。
更新数据 更新数据。
合并数据 合并数据。
DESCRIBE HISTORY

该命令用于显示DeltaLake的详细操作历史。

CONVERT

该命令用于将Parquet格式的表转成Delta表。

OPTIMIZE 该命令通过合并小文件或ZOrder排序优化Delta表的数据布局,提升查询效率。
VACUUM

该命令可以删除表路径中不需要的,且超过指定时间的数据文件。

SAVEPOINT

该命令可以永久保存DeltaLake的历史版本。

ROLLBACK

该命令可以恢复到DeltaLake某个历史版本。

删除数据

  • Scala
    import io.delta.tables.
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    deltaTable.delete("date < '2019-11-11'")
    import org.apache.spark.sql.functions.
    import spark.implicits.
    deltaTable.delete(col("date") < "2019-11-11")
  • SQL
    DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
    • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:
      DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • 如果您需要根据另一张表对目标表的匹配行进行删除(例如DELETE FROM target WHERE target.col = ref.col ...),请使用Merge语法。
    说明 使用DELETE命令,如果没有条件限制,则会删除所有数据。

更新数据

  • Scala
    import io.delta.tables._
    
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    
    deltaTable.updateExpr(            //使用SQL字符串。
      "name = 'Robet'",
      Map("name" -> "'Robert'")
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    deltaTable.update(                //使用SQL函数和隐式转换。
      col("name") === "Robet"),
      Map("name" -> lit("Robert"));
  • SQL
    UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
    • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:
      UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • 如果要根据另一张表对目标表的匹配行进行更新(例如,UPDATE target SET target.col = ref.col ... WHERE target.col = ref.col ...),请使用Merge语法。

合并数据

  • Scala
    import io.delta.tables._
    import org.apache.spark.sql.functions._
    
    val updatesDF = ...  // define the updates DataFrame[date, id, name]
    
    DeltaTable.forPath(spark, "/tmp/delta_table")
      .as("target")
      .merge(updatesDF.as("source"), "target.id = source.id")
      .whenMatched("target.name = 'should_update'")
      .updateExpr(Map("target.name" -> "source.name"))
      .whenMatched("target.name = 'should_delete'")
      .delete()
      .whenNotMatched("source.name = 'shoulde_insert'")
      .insertExpr(
        Map(
          "date" -> "updates.date",
          "eventId" -> "updates.eventId",
          "data" -> "updates.data"))
      .execute()
  • SQL
    MERGE INTO target AS t
    USING source AS s
    ON t.date = s.date
    WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
    WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
    WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
    • UPDATE子句和INSERT子句支持*语法,如果设置为UPDATE SET *或者INSERT *,则会更新或插入所有字段。
    • 暂不支持带有子查询的ON条件,但如果子查询为标量子查询的形式且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询。

DESCRIBE HISTORY

该命令用于显示DeltaLake的详细操作历史。

按照顺序展示版本号、操作时间、用户ID、用户名、操作类型、操作参数、作业信息、Notebook信息、集群、操作基于的前置版本、隔离等级、是否直接追加和操作Metrics等信息。
说明 通常大多数信息显示为Null。
示例如下:
  • 显示所有的操作记录。
    DESC HISTORY dbName.tableName;
  • 显示最新一条的操作记录。
    DESC HISTORY dbName.tableName limit 1;

CONVERT

该命令用于将Parquet格式的表转成Delta表。

CONVERT遍历指定路径下的Parquet数据文件,推测表的Schema,生成Delta表需要的元数据信息。如果Parquet表本身是分区表,则需要额外指定分区字段和类型。

示例如下:
  • 转换指定路径下的Parquet数据文件。
    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • 转换指定路径下的Parquet数据文件,并按照dt和hour进行分区。
    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);
使用CONVERT后,仅将表路径构建为Delta表需要的格式,还没有将其注册成表,需要继续使用CREATE TABLE命令,且不需要指定建表字段和分区字段,具体示例如下。
CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

该命令通过合并小文件或ZOrder排序优化Delta表的数据布局,提升查询效率。OPTIMIZ命令支持如下操作:
  • 针对分区表,可以指定分区来进行优化。
  • 可以指定非分区字段进行ZOrder排序,在进行正常Compact优化时调整数据布局。
示例如下:
  • 进行全局优化。
    OPTIMIZE dbName.tableName;
  • 对2021-04-01之前的分区进行优化。
    OPTIMIZE dbName.tableName WHERE date < '2021-04-01';
  • 对2021-04-01之前的分区使用col2和col3列进行优化。
    OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
说明
  • 对于Streaming入湖场景,通常每个batch较小,会导致小文件较多,可以定期执行Optimize命令合并小文件。
  • 对于查询模式相对固定的场景,例如,除分区字段外,仅指定几个列作为查询条件时,可以采用Zorder方式优化。

VACUUM

该命令可以删除表路径中不需要的,且超过指定时间的数据文件。

EMR的DeltaLake定义数据文件不需要包含以下两部分:
  • 当前最新版本关联到的数据文件。
  • 执行过Savepoint的特定版本关联到的数据文件。

    关于Savepoint,详见SAVEPOINT

VACUUM命令可以通过两种方式指定删除多久前的数据文件:
  • 通过参数delta.deletedFileRetentionDuration配置表属性,默认值为1周。
  • 通过VACUUM命令指定,单位为小时。
  • 语法
    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • 示例如下:
    • 删除7天之前的数据文件。
      VACUUM dbName.tableName;
    • 删除24小时之前的数据文件。
      VACUUM dbName.tableName RETAIN 24 HOURS; 
    • 显示待删除24小时之前的文件列表。
      VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;
      说明
      • 根据您创建表的实际情况,可以定期执行VACUUM命令,节省存储空间。
      • 实际执行VACUUM命令前,可以先通过DRY RUN命令,确认删除内容。

SAVEPOINT

该命令可以永久保存DeltaLake的历史版本。

DeltaLake会在每次执行CheckPoint(固定版本间隔,由参数delta.checkpointInterval决定)时清理掉log元数据文件(默认保留30天内的log元数据,由参数delta.logRetentionDuration决定)。通过VACUUM也会删除历史版本不再需要的数据文件。执行SAVEPOINT命令,可以永久避免log元数据和数据文件被删除,同时配合time-travel的能力,可以读取历史版本数据。

示例如下:
  • 保存ID为0的版本。
    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • 保存指定时间之前最近的版本。
    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
删除或查看SAVEPOINT操作记录的示例如下:
  • 删除记录
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • 查看记录

    可以显示SAVEPOINT的版本号、版本提交时间、SAVEPOINT时间及其他信息。

    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

该命令可以恢复到DeltaLake某个历史版本。

如果指定要恢复到的历史版本不可重建(即缺失log元数据或者对应的数据文件),则抛出异常。

  • 回滚到ID为0的版本。
    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • 回滚到指定时间之前最近的版本。
    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";