本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
Delta Lake支持Apache Spark DataFrame读写API提供的大多数选项,用于对表执行批量读写。
详细内容可参考Databricks官网文章:表批读写
有关演示这些功能的Databricks笔记本,请参阅入门笔记本二。
有关Delta Lake SQL命令的信息,请参见
Databricks Runtime 7.0及更高版本:Databricks Runtime 7.x SQL参考
Databricks Runtime 6.x及以下版本:Databricks Runtime 5.5 LTS和6.x SQL参考
建立表格
Delta Lake支持使用DataFrameWriter(Scala/Java / Python)直接基于路径创建表。Delta Lake还支持使用标准DDL CREATE TABLE在元存储中创建表。 使用Delta Lake在元存储中创建表时,它将表数据的位置存储在元存储中。此方式使其他用户更容易发现和引用数据,而无需担心数据存储的准确位置。但是,元存储不是表中有效内容的真实来源。数据内容仍然由Delta Lake负责存储。
SQL
%sql
-- Create table in the metastore
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
Python
%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df.write.format("delta").saveAsTable("events2") # create table in the metastore
df.write.format("delta").save("/mnt/delta/events3") # create table by path
Scala
%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.write.format("delta").saveAsTable("events4") // create table in the metastore
df.write.format("delta").save("/mnt/delta/events5") // create table by path
在Databricks Runtime 7.0及更高版本中,您可以使用DataFrameWriterV2接口创建Delta表。SQL还支持在路径中创建表,而无需在Hive元存储中创建条目。
SQL
%sql
-- Create a table by path
CREATE OR REPLACE TABLE delta.`/mnt/delta/events` (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date);
-- Create a table in the metastore
CREATE OR REPLACE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date);
SCALA
%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.writeTo("delta.`/mnt/delta/events`").using("delta").partitionedBy("date").createOrReplace() // create table by path
df.writeTo("events").using("delta").partitionedBy("date").createOrReplace() // create table in the metastore
分区数据
您可以对数据进行分区以加快其谓词及分区列的查询和DML。要在创建Delta表时对数据进行分区,请按列指定分区。常见的模式是按日期划分,例如:
SQL
%sql
-- Create table in the metastore
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION '/mnt/delta/events'
Python
%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df.write.format("delta").partitionBy("date").saveAsTable("events1") # create table in the metastore
df.write.format("delta").partitionBy("date").save("/mnt/delta/events2") # create table by path
Scala
%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.write.format("delta").partitionBy("date").saveAsTable("events3") // create table in the metastore
df.write.format("delta").partitionBy("date").save("/mnt/delta/events4") // create table by path
控制数据位置
要控制Delta表文件的位置,可以选择将LOCATION指定为OSS上的路径。与不指定路径的内部表不同,当您使用DROP表时,不会删除外部表的文件
如果运行CREATE TABLE的位置已经包含使用Delta Lake存储的数据,Delta Lake将执行以下操作:
如果只指定表名和位置,例如:
SQL
%sql
CREATE TABLE events
USING DELTA
LOCATION '/mnt/delta/events'
Hive Metastore中的表会自动继承现有数据的Schema,分区和表属性。此功能可用于将数据“导入”到元存储中。
如果指定任何配置(Schema,分区或表属性),则Delta Lake会验证该规范与现有数据的配置是否完全匹。
如果指定的配置与数据的配置不完全匹配,则Delta Lake会抛出描述差异的异常。
读取表
您可以通过指定表名或路径将Delta表加载到DataFrame中:
SQL
%sql
SELECT * FROM events -- query table in the metastore
SELECT * FROM delta.`/mnt/delta/events` -- query table by path
Python
%pyspark
spark.table("events") # query table in the metastore
spark.read.format("delta").load("/mnt/delta/events") # query table by path
Scala
%spark
spark.table("events") // query table in the metastore
spark.read.format("delta").load("/mnt/delta/events") // create table by path
返回的DataFrame会自动读取表中之前查询结果的最新快照;您不需要运行REFRESH TABLE。当查询中有适用的谓词时,Delta-Lake会自动使用分区和统计来读取最小数量的数据。
查询表的旧快照(按时间顺序查看)
Delta Lake按时间顺序查看允许您查询Delta表的旧快照。按时间顺序查看有很多用例,包括:
重新创建分析,报告或输出(例如,机器学习模型的输出)。这对于排查问题或者审计尤其有用,特别是在管控行业中。
编写复杂的时间查询。
修正数据中的错误。
在快速变更的表中,提供一系列的快照查询功能。
本节介绍了旧版本表和数据保存相关的查询方法,并提供了示例。
语法
本节说明如何查询较旧版sql本的Delta表。
SQL AS OF 语法
%sql
SELECT * FROM
events
TIMESTAMP AS OF timestamp_expression
SELECT * FROM events VERSION AS OF version
timestamp_expression为实际的时间,你可以通过DESCRIBE HISTORY events查看表的历史版本
table_identifier
[database_name.] table_name:一个表名,可以选择用数据库名限定。
delta.`<path-to-table>` :现有Delta表的位置。
时间戳表达式可以是以下任一项
'2018-10-18T22:15:12.013z',即可以转换为时间戳的字符串
cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18',即日期字符串
在Databricks运行时6.6及更高版本中:
current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
任意可以转换为时间戳的其它表达式
version是一个长整型数值,可以从DESCRIBE HISTORY table_spec查询中获取到。
时间戳表达式和版本都不能是子查询。
SQL
%sql
SELECT * FROM events TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/mnt/delta/events` VERSION AS OF 123
DataFrameReader选项
您可以使用DataFrameReader从特定版本的Delta表中创建DataFrame
Python
%pyspark
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")
对于timestamp_string,仅接受日期或时间戳记字符串。例如"2019-01-01"和"2019-01-01T00:00:00.000Z"。
一种常见的模式是在执行Databricks作业期间使用Delta表的最新状态来更新下游应用程序。
由于 Delta 表会自动更新,因此,如果基础数据进行了更新,则在多次调用时,从 Delta 表加载的DataFrame可能返回不同的结果。通过使用按时间顺序查看,您可以修复多次调用的DataFrame返回的数据:
Python
%pyspark
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/mnt/delta/events`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/mnt/delta/events")
语法
您可能有一个参数化的管道,其中管道的输入路径是作业的参数。在执行作业之后,您可能希望在将来某个时间重新生成输出。在这种情况下,可以使用@语法指定时间戳或版本。时间戳必须为yyyymmddhhmmssss格式。您可以在@之后指定一个版本,方法是在版本前面加一个v。例如,要查询表evnets件的版本123,请指定evnets@v123。
SQL
%sql
SELECT * FROM events@20190101000000000;
SELECT * FROM events@v123
Python
%pyspark
spark.read.format("delta").load("/mnt/delta/events@20190101000000000") # table on 2019-01-01 00:00:00.000
spark.read.format("delta").load("/mnt/delta/events@v123") # table on version 123
数据保存
默认情况下,Delta表将提交历史记录保留30天。这意味着您可以声明一个30天以内的版本。但是,有以下注意事项:
您没有在Delta表上运行VACUUM。如果运行VACUUM,您将无法恢复到默认的7天数据保留期之前的版本。
您可以使用以下表属性来配置保留期:
delta.logRetentionDuration = "interval <interval>":控制表的历史记录保留时间长度。每次写入一个检查点时,Databricks都会自动清除早于保留间隔的日志条目。如果将此配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为针对日志的操作恒定时间。历史记录的操作是并行的(但是随着日志大小的增加,它将变得更加昂贵)。默认值为interval 30 days
delta.deletedFileRetentionDuration = "interval <interval>":控制选择的文件必须选择时间段,默认值为间隔7天。若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致您的存储成本上升。
重要VACUUM 不清理日志文件;写入检查点后,日志文件将自动清除。
按时间顺序查看到以前的版本,必须保留日志文件和该版本的数据文件。
案例
修复用户111表的意外删除问题:
SQL
%sql INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修复对表的意外错误更新:
SQL
%sql MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查询过去一周增加的新客户数量。
SQL
%sql SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
写入表格
追加
使用append模式,可以将新数据以原子的方式添加到现有的Delta表中:
SQL
%sql
INSERT INTO events SELECT * FROM newEvents
Python
%pyspark
df.write.format("delta").mode("append").save("/mnt/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")
Scala
%spark
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/mnt/delta/events")
覆盖
要原子式地替换表中的所有数据,可以使用overwrite模式:
SQL
%sql
INSERT OVERWRITE TABLE events SELECT * FROM newEvents
Python
%pyspark
from pyspark.sql.functions import to_date
df = spark.createDataFrame([("case21", '2020-10-12', 23, 'INFO'),("case22", '2020-10-13', 24, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
df1.write.format("delta").mode("append").save("/mnt/delta/events")
df1.write.format("delta").mode("append").saveAsTable("events")
Scala
%spark
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/mnt/delta/events")
使用DataFrames,还可以选择性地只覆盖与分区列上的谓词匹配的数据。以下命令将原子式地把大于10-4的数据替换为10-12、10-13号的数据
Python
%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
# 可以替换分区部分数据(replaceWhere中字段必须是分区字段);此处替换大于等于10-4的数据,并将10-12、10-13号的数据填入
df1.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2020-10-4'").saveAsTable("events_partition")
spark.table("events_partition").show(50)
Scala
%spark
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/mnt/delta/events")
此示例代码将数据写入df,验证所有数据均位于指定分区内,并执行atomic替换。
与Apache Spark中的文件API不同,Delta Lake会记住并强制执行表的Schema。这意味着默认情况下,覆盖不会替换现有表的Schema。
有关Delta Lake在更新表方面的支持,请参阅表删除,更新和合并。
设置用户定义的提交元数据
您可以使用DataFrameWriter选项userMetadata或SparkSession配置spark.databricks.delta.commitInfo.userMetadata,将用户定义的字符串指定为这些操作进行的提交中的元数据。如果同时指定了两个参数,则以DataFrameWriter选项userMetadata优先。用户定义的元数据在历史记录操作中是可读的。
SQL
%sql
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE events SELECT * FROM newEvents
Python
%python
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/mnt/delta/events")
Scala
%spark
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "overwritten-for-fixing-incorrect-data")
.save("/mnt/delta/events")
Schema验证
Delta Lake自动验证正在写入的DataFrame的Schema与表的Schema兼容。Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容:
所有DataFrame列都必须存在于目标表中。如DataFrame中有表中不存在列,则会抛出异常。表中存在但DataFrame中不存在的列设置为NULL。
DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配,则会抛出异常。
DataFrame列名称不能仅通过大小写来区分。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写(默认)模式下使用Spark,但在存储和返回列信息时,Parquet区分大小写。在存储Schema时,Delta Lake 保留但不区分大小写,并采用此限制来避免潜在的错误、数据损坏或丢失问题。
Delta Lake支持DDL显式添加新列,并具有自动更新Schema的功能。
如果您指定其他选项(例如partitionBy与附加模式结合使用),则Delta Lake会验证它们是否匹配,并在任何不匹配项时发生错误。如果分区不存在,会在对现有数据分区之后自动进行追加。
在Databricks Runtime 7.0及更高版本中,INSERT语法提供了Schema强制实施并支持Schema演变。如果不能将列的数据类型安全地强制转换为Delta Lake表的数据类型,则将抛出运行时异常。如果启用Schema演化,则新列可以作为Schema的最后一列(或嵌套列)存在,以使Schema得以演化。
更新表Schema
Delta Lake可让您更新表的Schema。支持以下类型的更改:
添加新列(在任意位置)
重新排序现有列
您可以使用DDL显式进行更改,也可以使用DML隐式进行更改。
更新Delta表Schema时,从该表读取的流将终止。如果你希望流继续进行,则必须重新启动它。
有关推荐的方法,请参见生产中的结构化流。
显示更新Schema
您可以使用以下DDL显式更改表的Schema。
添加列
SQL
%sql
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,可空性为true。
要将列添加到嵌套字段,请使用:
SQL
%sql
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
例
如果运行之前的Schema为:ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
| - colA
| - colB
| +-field1
| +-field2
之后的Schema是:
| - colA
| - colB
| +-field1
| +-nested
| +-field2
仅支持为结构添加嵌套列。不支持数组和映射。
更改列注释或顺序
SQL
%sql
ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
要更改嵌套字段中的列,请使用
SQL
%sql
ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
例
如果运行之前的Schema为:ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST
| - colA
| - colB
| +-field1
| +-field2
之后的模式是:
| - colA
| - colB
| +-field2
| +-field1
更换列
SQL
%sql
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例
运行以下DSL时:
SQL
%sql
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
如果之前的Schema是:
| - colA
| - colB
| +-field1
| +-field2
之后的模式是:
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
更改列类型或名称
更改列的类型或名称或删除列需要重写表。为此,请使用以下overwriteSchema选项:
更改列类型
Python
%pyspark
spark.read.table(...)
.withColumn("date", col("date").cast("date"))
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
更改列名
Python
%pyspark
spark.read.table(...)
.withColumnRenamed("date", "date_created")
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
自动Schema更新
Delta Lake可以作为DML事务的一部分(附加或覆盖)自动更新表的Schema,并使该Schema与正在写入的数据兼容。
添加列
在以下情况下,会自动添加DataFrame中存在但表中缺少的列将作为写事务的一部分:
write或writeStream有.option("mergeSchema", "true")
spark.databricks.delta.schema.autoMerge.enabled是 true
如果同时指定了两个选项,则以该DataFrameWriter的option选项为准。添加的列将追加到它们所在结构的末尾。追加新列时将保留大小写。
当启用了表访问控制时,mergeSchema不受支持(因为它将需要MODIFY的请求为ALL PRIVILEGES的请求) 。
mergeSchema 不能与INSERT INTO或.write.insertInto()一起使用。
NullType 列
由于Parquet不支持Nulltype,因此在写入Delta表时会将Nulltype列从DataFrame中删除,但仍存储在Schema中。当为该列接收到不同的数据类型时,Delta Lake会将Schema合并到新的数据类型。如果Delta Lake收到现有列的Nulltype,则在写入过程中将保留旧Schema,并删除新列。
Nulltype不支持流式传输。由于必须在使用流式传输时设置Schema,因此这种情况很少见。Nulltype也不适用于ArrayType和MapType的复杂类型。
替换表Schema
默认情况下,覆盖表中的数据不会覆盖Schema。在不使用replaceWhere的情况下使用mode(“overwrite”)重写表时,您可能仍然希望覆盖正在写入的数据的Schema。通过将overwriteSchema选项设置为true,可以替换表的Schema和分区:
Python
%pyspark
df.write.option("overwriteSchema", "true")
Table上的视图
就像您可能使用数据源表一样,Delta Lake支持在Delta表之上创建视图。
这些视图与表访问控制集成在一起,以实现列级和行级安全性。
使用视图进行操作时的核心挑战是解析Schema。如果更改Delta表Schema,则必须重新创建派生视图以说明对该Schema的添加任何内容。例如,如果向Delta表中添加一个新列,则必须确保此列在该基于该基表构建的相应视图中可用。
表属性
你可以在CREATE和ALTER时使用TBLPROPERTIES作为表属性来存储你的元数据
TBLPROPERTIES作为Delta表元数据一部分存储。如果给定位置中已经存在Delta表,则不能在CREATE语句中定义新的TBLPROPERTIES 。
此外,为了调整行为和性能,Delta Lake支持某些Delta表属性:
阻止Delta表中删除和更新:delta.appendOnly=true。
配置按时间顺序查看保留属性:delta.logRetentionDuration=<interval-string>和delta.deletedFileRetentionDuration=<interval-string>。
配置要收集其统计信息的列数:delta.dataSkippingNumIndexedCols=<number-of-columns>。此属性仅对写入的新数据有效。
这些是唯一受支持的delta.前缀表属性。
修改Delta表属性是一种写操作,它将与其他并发的写操作发生冲突,从而导致它们失败。我们建议您仅在表上没有并发写操作时才修改表属性。
您还可以使用Spark配置在首次提交delta表时设置delta.-prefixed属性。例如,要使用该属性初始化Delta表delta.appendOnly=true,请将Spark配置spark.databricks.delta.properties.defaults.appendOnly设置为true。例如:
SQL
%sql
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
Scala
%spark
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
Python
%pyspark
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
元数据表
Delta Lake具有丰富的功能,可用于浏览元数据表。
它支持SHOW[PARTITIONS | COLUMNS]和DESCRIBE TABLE。查看
Databricks Runtime 7.0及更高版本
Databricks Runtime 6.x及以下版本
它还提供以下独特命令:
DESCRIBE DETAlL
DESCRIBE HISTORY
DESCRIBE DETAlL
提供有关Schema,分区,表大小等的信息。
DESCRIBE HISTORY
提供源信息,包括操作,用户等,以及每次写入表的操作指标。表格历史记录将保留30天。
数据侧栏提供了Delta表的详细表信息和历史的可视化视图。除了表Schema和示例数据外,还可以单击“历史记录”选项卡来查看与“DESCRIBE HISTORY”一起显示的表历史记录。