数据管理

阿里云EMR Delta Lake提供了强大的数据处理能力,可以帮助您管理和操作数据,确保数据的质量和一致性。本文为您介绍EMR Delta Lake如何进行删除、更新与合并数据等操作。

DELETE

该命令用于删除数据。示例如下。

DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
说明

使用DELETE命令时,如果没有条件限制,则会删除所有数据。

  • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:

    DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果您需要根据另一张表对目标表的匹配行进行删除(例如DELETE FROM target WHERE target.col = ref.col ...),请使用Merge语法。

UPDATE

该命令用于更新数据。示例如下。

UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")

deltaTable.updateExpr(            //使用SQL字符串。
  "name = 'Robet'",
  Map("name" -> "'Robert'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                //使用SQL函数和隐式转换。
  col("name") === "Robet"),
  Map("name" -> lit("Robert"));
  • 暂不支持带有子查询的WHERE条件。但如果子查询为标量子查询且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询,例如:

    UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果要根据另一张表对目标表的匹配行进行更新(例如,UPDATE target SET target.col = ref.col ... WHERE target.col = ref.col ...),请使用Merge语法。

MERGE

该命令用于合并数据。示例如下。

MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, id, name]

DeltaTable.forPath(spark, "/tmp/delta_table")
  .as("target")
  .merge(updatesDF.as("source"), "target.id = source.id")
  .whenMatched("target.name = 'should_update'")
  .updateExpr(Map("target.name" -> "source.name"))
  .whenMatched("target.name = 'should_delete'")
  .delete()
  .whenNotMatched("source.name = 'shoulde_insert'")
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
  • UPDATE子句和INSERT子句支持*语法,如果设置为UPDATE SET *或者INSERT *,则会更新或插入所有字段。

  • 暂不支持带有子查询的ON条件,但如果子查询为标量子查询的形式且使用SQL,可以设置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后进行查询。

ALTER TABLE

该命令用于更改现有表的结构和属性。支持对表进行以下操作:

  • ADD COLUMN:可以向表中添加新的列。

  • RENAME COLUMN(需要开启Column Mapping):可以将表中的列更改为新的名称。

  • DROP COLUMN(需要开启Column Mapping):可以从表中删除指定的列。

  • SET/UNSET TBLPROPERTIES:可以设置表级别的属性,如表的描述、表的存储格式等。

  • RENAME TO:将表的名称更改为新的名称。

重要

对分区表执行ADD COLUMN操作,建议将新增字段追加到分区字段之前,以避免在查询引擎如Hive中查询Delta表时出现数据异常。

例如,在指定位置执行ADD COLUMN操作。

-- 假设delta_tbl表的Schema为(id IN, name STRING, pt STRING),其中pt为分区字段。
-- 新增new_col字段,并将其追加到name字段后,pt字段前。
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);

DESCRIBE HISTORY

该命令用于显示Delta Lake的详细操作历史。

按照顺序展示版本号、操作时间、用户ID、用户名、操作类型、操作参数、作业信息、Notebook信息、集群、操作基于的前置版本、隔离等级、是否直接追加和操作Metrics等信息。

说明

通常大多数信息显示为Null。

示例如下:

  • 显示所有的操作记录。

    DESC HISTORY dbName.tableName;
  • 显示最新一条的操作记录。

    DESC HISTORY dbName.tableName limit 1;

CONVERT

该命令用于将Parquet格式的表转成Delta表。

CONVERT遍历指定路径下的Parquet数据文件,推测表的Schema,生成Delta表需要的元数据信息。如果Parquet表本身是分区表,则需要额外指定分区字段和类型。

示例如下:

  • 转换指定路径下的Parquet数据文件。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • 转换指定路径下的Parquet数据文件,并按照dt和hour进行分区。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);

使用CONVERT后,仅将表路径构建为Delta表所需的格式,尚未将其注册为表,需要继续使用CREATE TABLE命令。此时无需指定建表字段和分区字段。以下是具体示例。

CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

该命令通过合并小文件或ZOrder排序优化Delta表的数据布局,提升查询效率。OPTIMIZE命令支持如下操作:

  • 针对分区表,可以通过指定分区来进行优化。

  • 在进行正常Compact优化时,可以通过指定非分区字段进行ZOrder排序,以调整数据布局。

代码示例如下。

set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;

-- 对dbName.tableName表进行全局优化。
OPTIMIZE dbName.tableName;

-- 对2021-04-01之前的分区进行优化。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';

-- 对2021-04-01之前的分区进行优化,并使用col2, col3列进行排序。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
说明
  • 对于Streaming入湖场景,通常每个batch较小,会导致小文件较多,可以定期执行Optimize命令合并小文件。

  • 对于查询模式相对固定的场景,例如,除分区字段外,仅指定几个列作为查询条件时,可以采用Zorder方式优化。

VACUUM

该命令可以删除表路径中不需要的,且超过指定时间的数据文件。

EMR的Delta Lake定义数据文件不需要包含以下两部分:

  • 当前最新版本关联到的数据文件。

  • 执行过Savepoint的特定版本关联到的数据文件。

VACUUM命令可以通过两种方式指定删除多久前的数据文件:

  • 通过参数delta.deletedFileRetentionDuration配置表属性,默认值为1周。

  • 通过VACUUM命令指定,单位为小时。

  • 语法

    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • 示例

    -- 删除数据文件。
    VACUUM dbName.tableName; 
    -- 删除24小时之前的数据文件。
    VACUUM dbName.tableName RETAIN 24 HOURS; 
    -- 显示待删除24小时之前的数据文件。
    VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN; 
    说明
    • 根据您创建表的实际情况,可以定期执行VACUUM命令,节省存储空间。

    • 实际执行VACUUM命令前,可以先通过DRY RUN命令,确认删除内容。

SAVEPOINT

该命令可以永久保存Delta Lake的历史版本。

Delta Lake会在每次执行CheckPoint(固定版本间隔,由参数delta.checkpointInterval决定)时清理掉log元数据文件(默认保留30天内的log元数据,由参数delta.logRetentionDuration决定)。通过VACUUM也会删除历史版本不再需要的数据文件。执行SAVEPOINT命令,可以永久避免log元数据和数据文件被删除,同时配合time-travel的能力,可以读取历史版本数据。

示例如下:

  • 保存ID为0的版本。

    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    说明

    /path/to/delta_tbl 为您实际的Delta表文件系统路径

  • 保存指定时间之前最近的版本。

    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";

删除或查看SAVEPOINT操作记录。示例如下:

  • 删除记录

    --删除特定版本的数据。
    DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    --删除特定时间戳之前的数据。
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • 查看记录

    可以显示SAVEPOINT的版本号、版本提交时间、SAVEPOINT时间及其他信息。

    SHOW SAVEPOINT delta.`/path/to/delta_tbl`;
    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

该命令可以恢复到Delta Lake某个历史版本。

如果指定要恢复到的历史版本不可重建(即缺失log元数据或者对应的数据文件),则抛出异常。示例如下:

  • 回滚到ID为0的版本。

    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • 回滚到指定时间之前最近的版本。

    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";