详情可参考Databricks官网文章:迁移指南
将工作负载迁移到Delta Lake
当您将工作负载迁移到Delta-Lake时,您应该注意到以下简化和与apachespark和apachehive提供的数据源相比的区别。
Delta Lake自动处理以下操作,您永远不要手动执行这些操作:
REFRESH TABLE:Delta表始终返回最新信息,因此在更改之后不需要手动调用REFRESH TABLE。
添加和删除分区:Delta lake自动跟踪表中的分区集,并在添加或删除数据时更新列表。因此,不需要运行ALTER TABLE[ADD | DROP]PARTITION或MSCK。
加载单个分区:作为一种优化,有时您可能会直接加载您感兴趣的数据分区。例如,spark.read.parquet("/data/date=2017-01-01")。Delta Lake不需要这样做,因为它可以从事务日志中快速读取文件列表以找到相关文件。如果您对单个分区感兴趣,请使用WHERE子句指定它。例如:spark.read.delta("/data").where("date = '2017-01-01'"). 对于分区中有许多文件的大型表,这可能比从Parquet表加载单个分区(使用直接分区路径或WHERE)要快得多,因为在目录中列出文件通常比从事务日志中读取文件列表慢。
将现有应用程序移植到Delta Lake时,应避免执行以下操作,这些操作会绕过事务日志:
手动修改数据:Delta Lake使用事务日志将更改自动提交到表中。因为日志是事实的来源,所以Spark不会读取已写出但未添加到事务日志中的文件。同样,即使您手动删除文件,事务日志中仍然存在指向该文件的指针。始终使用本指南中描述的命令来代替手动修改存储在Delta表中的文件。
外部读取器:直接读取存储在Delta Lake中的数据。有关如何读取Delta表的信息,请参阅Integrations。
示例 假设您已将Parquet数据存储在directory/data-pipeline 中,并希望创建一个名为events的表。您始终可以读入DataFrame并另存为Delta表。这种方法复制数据,并让Spark管理表。另外,您可以转换到较快的Delta Lake,但会导致表格不受管理。
另存为Delta表
将数据读入DataFrame并将其保存为以下delta格式的新目录:
Python
%pyspark data = spark.read.parquet("/data-pipeline") data.write.format("delta").save("/mnt/delta/data-pipeline/")
创建一个Delta表events,该表引用Delta Lake目录中的文件:
Python
%pyspark spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/data-pipeline/'")
转换为增量表
您有两种选择将Parquet表转换为Delta表:
将文件转换为Delta Lake格式并创建Delta表:
SQL
%sql CONVERT TO DELTA parquet.`/data-pipeline/` CREATE TABLE events USING DELTA LOCATION '/data-pipeline/'
创建Parquet表并转换为Delta表:
SQL
%sql CREATE TABLE events USING PARQUET OPTIONS (path '/data-pipeline/') CONVERT TO DELTA events