表批读写

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

Delta Lake支持Apache Spark DataFrame读写API提供的大多数选项,用于对表执行批量读写。

说明

详细内容可参考Databricks官网文章:表批读写

有关演示这些功能的Databricks笔记本,请参阅入门笔记本二

有关Delta Lake SQL命令的信息,请参见

建立表格

Delta Lake支持使用DataFrameWriter(Scala/Java / Python)直接基于路径创建表。Delta Lake还支持使用标准DDL CREATE TABLE在元存储中创建表。 使用Delta Lake在元存储中创建表时,它将表数据的位置存储在元存储中。此方式使其他用户更容易发现和引用数据,而无需担心数据存储的准确位置。但是,元存储不是表中有效内容的真实来源。数据内容仍然由Delta Lake负责存储。

SQL

%sql
-- Create table in the metastore
CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
创建表

Python

%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df.write.format("delta").saveAsTable("events2")      # create table in the metastore

df.write.format("delta").save("/mnt/delta/events3")  # create table by path
python创建表

Scala

%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.write.format("delta").saveAsTable("events4")      // create table in the metastore

df.write.format("delta").save("/mnt/delta/events5")  // create table by path
spark创建表

在Databricks Runtime 7.0及更高版本中,您可以使用DataFrameWriterV2接口创建Delta表。SQL还支持在路径中创建表,而无需在Hive元存储中创建条目。

SQL

%sql
-- Create a table by path
CREATE OR REPLACE TABLE delta.`/mnt/delta/events` (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date);

-- Create a table in the metastore
CREATE OR REPLACE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date);

SCALA

%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.writeTo("delta.`/mnt/delta/events`").using("delta").partitionedBy("date").createOrReplace() // create table by path

df.writeTo("events").using("delta").partitionedBy("date").createOrReplace()                   // create table in the metastore

分区数据

您可以对数据进行分区以加快其谓词及分区列的查询和DML。要在创建Delta表时对数据进行分区,请按列指定分区。常见的模式是按日期划分,例如:

SQL

%sql
-- Create table in the metastore
CREATE TABLE events (
 date DATE,
 eventId STRING,
 eventType STRING,
 data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION '/mnt/delta/events'
创建分区表

Python

%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df.write.format("delta").partitionBy("date").saveAsTable("events1")      # create table in the metastore


df.write.format("delta").partitionBy("date").save("/mnt/delta/events2")  # create table by path
创建分区表

Scala

%spark
val df = spark.createDataFrame(Seq(("case21", "2020-10-12", 21, "INFO"))).toDF("data", "date", "eventId", "eventType")
df.write.format("delta").partitionBy("date").saveAsTable("events3")      // create table in the metastore

df.write.format("delta").partitionBy("date").save("/mnt/delta/events4")  // create table by path
创建分区表

控制数据位置

要控制Delta表文件的位置,可以选择将LOCATION指定为OSS上的路径。与不指定路径的内部表不同,当您使用DROP表时,不会删除外部表的文件

如果运行CREATE TABLE的位置已经包含使用Delta Lake存储的数据,Delta Lake将执行以下操作:

如果只指定表名和位置,例如:

SQL

%sql
CREATE TABLE events
USING DELTA
LOCATION '/mnt/delta/events'

Hive Metastore中的表会自动继承现有数据的Schema,分区和表属性。此功能可用于将数据“导入”到元存储中。

如果指定任何配置(Schema,分区或表属性),则Delta Lake会验证该规范与现有数据的配置是否完全匹。

警告

如果指定的配置与数据的配置不完全匹配,则Delta Lake会抛出描述差异的异常。

读取表

您可以通过指定表名或路径将Delta表加载到DataFrame中:

SQL

%sql
SELECT * FROM events   -- query table in the metastore

SELECT * FROM delta.`/mnt/delta/events`  -- query table by path

Python

%pyspark
spark.table("events")    # query table in the metastore

spark.read.format("delta").load("/mnt/delta/events")   # query table by path

Scala

%spark
spark.table("events")      // query table in the metastore

spark.read.format("delta").load("/mnt/delta/events")  // create table by path

返回的DataFrame会自动读取表中之前查询结果的最新快照;您不需要运行REFRESH TABLE。当查询中有适用的谓词时,Delta-Lake会自动使用分区和统计来读取最小数量的数据。

查询表的旧快照(按时间顺序查看)

Delta Lake按时间顺序查看允许您查询Delta表的旧快照。按时间顺序查看有很多用例,包括:

  • 重新创建分析,报告或输出(例如,机器学习模型的输出)。这对于排查问题或者审计尤其有用,特别是在管控行业中。

  • 编写复杂的时间查询。

  • 修正数据中的错误。

  • 在快速变更的表中,提供一系列的快照查询功能。

本节介绍了旧版本表和数据保存相关的查询方法,并提供了示例。

语法

本节说明如何查询较旧版sql本的Delta表。

SQL AS OF 语法

%sql
SELECT * FROM  
events 
 TIMESTAMP AS OF timestamp_expression
SELECT * FROM events VERSION AS OF version
说明

timestamp_expression为实际的时间,你可以通过DESCRIBE HISTORY events查看表的历史版本

表的历史版本查看

table_identifier

  • [database_name.] table_name:一个表名,可以选择用数据库名限定。

  • delta.`<path-to-table>` :现有Delta表的位置。

时间戳表达式可以是以下任一项

  1. '2018-10-18T22:15:12.013z',即可以转换为时间戳的字符串

  2. cast('2018-10-18 13:36:32 CEST' as timestamp)

  3. '2018-10-18',即日期字符串

  4. 在Databricks运行时6.6及更高版本中:

    1. current_timestamp() - interval 12 hours

    2. date_sub(current_date(), 1)

    3. 任意可以转换为时间戳的其它表达式

version是一个长整型数值,可以从DESCRIBE HISTORY table_spec查询中获取到。

时间戳表达式和版本都不能是子查询。

SQL

%sql
SELECT * FROM events TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/mnt/delta/events` VERSION AS OF 123

DataFrameReader选项

您可以使用DataFrameReader从特定版本的Delta表中创建DataFrame

Python

%pyspark
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")
按时间查询

对于timestamp_string,仅接受日期或时间戳记字符串。例如"2019-01-01"和"2019-01-01T00:00:00.000Z"。

一种常见的模式是在执行Databricks作业期间使用Delta表的最新状态来更新下游应用程序。

由于 Delta 表会自动更新,因此,如果基础数据进行了更新,则在多次调用时,从 Delta 表加载的DataFrame可能返回不同的结果。通过使用按时间顺序查看,您可以修复多次调用的DataFrame返回的数据:

Python

%pyspark
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/mnt/delta/events`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/mnt/delta/events")
按时间查询

语法

您可能有一个参数化的管道,其中管道的输入路径是作业的参数。在执行作业之后,您可能希望在将来某个时间重新生成输出。在这种情况下,可以使用@语法指定时间戳或版本。时间戳必须为yyyymmddhhmmssss格式。您可以在@之后指定一个版本,方法是在版本前面加一个v。例如,要查询表evnets件的版本123,请指定evnets@v123。

SQL

%sql
SELECT * FROM events@20190101000000000;
SELECT * FROM events@v123
按时间查询和版本查询

Python

%pyspark
spark.read.format("delta").load("/mnt/delta/events@20190101000000000") # table on 2019-01-01 00:00:00.000
spark.read.format("delta").load("/mnt/delta/events@v123")              # table on version 123

数据保存

默认情况下,Delta表将提交历史记录保留30天。这意味着您可以声明一个30天以内的版本。但是,有以下注意事项:

  • 您没有在Delta表上运行VACUUM。如果运行VACUUM,您将无法恢复到默认的7天数据保留期之前的版本。

您可以使用以下表属性来配置保留期:

  • delta.logRetentionDuration = "interval <interval>":控制表的历史记录保留时间长度。每次写入一个检查点时,Databricks都会自动清除早于保留间隔的日志条目。如果将此配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为针对日志的操作恒定时间。历史记录的操作是并行的(但是随着日志大小的增加,它将变得更加昂贵)。默认值为interval 30 days

  • delta.deletedFileRetentionDuration = "interval <interval>":控制选择的文件必须选择时间段,默认值为间隔7天。若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致您的存储成本上升。

    重要

    VACUUM 不清理日志文件;写入检查点后,日志文件将自动清除。

    按时间顺序查看到以前的版本,必须保留日志文件和该版本的数据文件。

案例

  • 修复用户111表的意外删除问题:

    SQL

    %sql
    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111

  • 修复对表的意外错误更新:

    SQL

    %sql
    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *

  • 查询过去一周增加的新客户数量。

    SQL

    %sql
    SELECT count(distinct userId) - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

写入表格

追加

使用append模式,可以将新数据以原子的方式添加到现有的Delta表中:

SQL

%sql
INSERT INTO events SELECT * FROM newEvents

Python

%pyspark
df.write.format("delta").mode("append").save("/mnt/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")

Scala

%spark
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/mnt/delta/events")

覆盖

要原子式地替换表中的所有数据,可以使用overwrite模式:

SQL

%sql
INSERT OVERWRITE TABLE events SELECT * FROM newEvents
插入表

Python

%pyspark
from pyspark.sql.functions import  to_date
df = spark.createDataFrame([("case21", '2020-10-12', 23, 'INFO'),("case22", '2020-10-13', 24, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
df1.write.format("delta").mode("append").save("/mnt/delta/events")
df1.write.format("delta").mode("append").saveAsTable("events")

Scala

%spark
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/mnt/delta/events")

使用DataFrames,还可以选择性地只覆盖与分区列上的谓词匹配的数据。以下命令将原子式地把大于10-4的数据替换为10-12、10-13号的数据

Python

%pyspark
df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
# 可以替换分区部分数据(replaceWhere中字段必须是分区字段);此处替换大于等于10-4的数据,并将10-12、10-13号的数据填入
df1.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2020-10-4'").saveAsTable("events_partition")
spark.table("events_partition").show(50)
替换部分数据

Scala

%spark
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/mnt/delta/events")

此示例代码将数据写入df,验证所有数据均位于指定分区内,并执行atomic替换。

说明

与Apache Spark中的文件API不同,Delta Lake会记住并强制执行表的Schema。这意味着默认情况下,覆盖不会替换现有表的Schema。

有关Delta Lake在更新表方面的支持,请参阅表删除,更新和合并

设置用户定义的提交元数据

您可以使用DataFrameWriter选项userMetadata或SparkSession配置spark.databricks.delta.commitInfo.userMetadata,将用户定义的字符串指定为这些操作进行的提交中的元数据。如果同时指定了两个参数,则以DataFrameWriter选项userMetadata优先。用户定义的元数据在历史记录操作中是可读的。

SQL

%sql
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE events SELECT * FROM newEvents

Python

%python
df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/mnt/delta/events")

Scala

%spark
df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/mnt/delta/events")

Schema验证

Delta Lake自动验证正在写入的DataFrame的Schema与表的Schema兼容。Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容:

  • 所有DataFrame列都必须存在于目标表中。如DataFrame中有表中不存在列,则会抛出异常。表中存在但DataFrame中不存在的列设置为NULL。

  • DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配,则会抛出异常。

  • DataFrame列名称不能仅通过大小写来区分。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写(默认)模式下使用Spark,但在存储和返回列信息时,Parquet区分大小写。在存储Schema时,Delta Lake 保留但不区分大小写,并采用此限制来避免潜在的错误、数据损坏或丢失问题。

Delta Lake支持DDL显式添加新列,并具有自动更新Schema的功能。

如果您指定其他选项(例如partitionBy与附加模式结合使用),则Delta Lake会验证它们是否匹配,并在任何不匹配项时发生错误。如果分区不存在,会在对现有数据分区之后自动进行追加。

重要

在Databricks Runtime 7.0及更高版本中,INSERT语法提供了Schema强制实施并支持Schema演变。如果不能将列的数据类型安全地强制转换为Delta Lake表的数据类型,则将抛出运行时异常。如果启用Schema演化,则新列可以作为Schema的最后一列(或嵌套列)存在,以使Schema得以演化。

更新表Schema

Delta Lake可让您更新表的Schema。支持以下类型的更改:

  • 添加新列(在任意位置)

  • 重新排序现有列

您可以使用DDL显式进行更改,也可以使用DML隐式进行更改。

警告

更新Delta表Schema时,从该表读取的流将终止。如果你希望流继续进行,则必须重新启动它。

有关推荐的方法,请参见生产中的结构化流

显示更新Schema

您可以使用以下DDL显式更改表的Schema。

添加列

SQL

%sql
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

默认情况下,可空性为true。

要将列添加到嵌套字段,请使用:

SQL

%sql
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

如果运行之前的Schema为:ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)

| - colA
| - colB
| +-field1
| +-field2

之后的Schema是:

| - colA
| - colB
| +-field1
| +-nested
| +-field2
说明

仅支持为结构添加嵌套列。不支持数组和映射。

更改列注释或顺序

SQL

%sql
ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

要更改嵌套字段中的列,请使用

SQL

%sql
ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

如果运行之前的Schema为:ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST

| - colA
| - colB
| +-field1
| +-field2

之后的模式是:

| - colA
| - colB
| +-field2
| +-field1

更换列

SQL

%sql
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

运行以下DSL时:

SQL

%sql
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

如果之前的Schema是:

| - colA
| - colB
| +-field1
| +-field2

之后的模式是:

| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

更改列类型或名称

更改列的类型或名称或删除列需要重写表。为此,请使用以下overwriteSchema选项:

更改列类型

Python

%pyspark
spark.read.table(...)
  .withColumn("date", col("date").cast("date"))
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)

更改列名

Python

%pyspark
spark.read.table(...)
  .withColumnRenamed("date", "date_created")
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)

自动Schema更新

Delta Lake可以作为DML事务的一部分(附加或覆盖)自动更新表的Schema,并使该Schema与正在写入的数据兼容。

添加列

在以下情况下,会自动添加DataFrame中存在但表中缺少的列将作为写事务的一部分:

  • write或writeStream有.option("mergeSchema", "true")

  • spark.databricks.delta.schema.autoMerge.enabled是 true

如果同时指定了两个选项,则以该DataFrameWriter的option选项为准。添加的列将追加到它们所在结构的末尾。追加新列时将保留大小写。

说明
  • 当启用了表访问控制时,mergeSchema不受支持(因为它将需要MODIFY的请求为ALL PRIVILEGES的请求) 。

  • mergeSchema 不能与INSERT INTO或.write.insertInto()一起使用。

NullType 列

由于Parquet不支持Nulltype,因此在写入Delta表时会将Nulltype列从DataFrame中删除,但仍存储在Schema中。当为该列接收到不同的数据类型时,Delta Lake会将Schema合并到新的数据类型。如果Delta Lake收到现有列的Nulltype,则在写入过程中将保留旧Schema,并删除新列。

Nulltype不支持流式传输。由于必须在使用流式传输时设置Schema,因此这种情况很少见。Nulltype也不适用于ArrayType和MapType的复杂类型。

替换表Schema

默认情况下,覆盖表中的数据不会覆盖Schema。在不使用replaceWhere的情况下使用mode(“overwrite”)重写表时,您可能仍然希望覆盖正在写入的数据的Schema。通过将overwriteSchema选项设置为true,可以替换表的Schema和分区:

Python

%pyspark
df.write.option("overwriteSchema", "true")

Table上的视图

就像您可能使用数据源表一样,Delta Lake支持在Delta表之上创建视图。

这些视图与表访问控制集成在一起,以实现列级和行级安全性。

使用视图进行操作时的核心挑战是解析Schema。如果更改Delta表Schema,则必须重新创建派生视图以说明对该Schema的添加任何内容。例如,如果向Delta表中添加一个新列,则必须确保此列在该基于该基表构建的相应视图中可用。

表属性

你可以在CREATE和ALTER时使用TBLPROPERTIES作为表属性来存储你的元数据

TBLPROPERTIES作为Delta表元数据一部分存储。如果给定位置中已经存在Delta表,则不能在CREATE语句中定义新的TBLPROPERTIES 。

此外,为了调整行为和性能,Delta Lake支持某些Delta表属性:

  • 阻止Delta表中删除和更新:delta.appendOnly=true。

  • 配置按时间顺序查看保留属性:delta.logRetentionDuration=<interval-string>和delta.deletedFileRetentionDuration=<interval-string>。

  • 配置要收集其统计信息的列数:delta.dataSkippingNumIndexedCols=<number-of-columns>。此属性仅对写入的新数据有效。

说明

  • 这些是唯一受支持的delta.前缀表属性。

  • 修改Delta表属性是一种写操作,它将与其他并发的写操作发生冲突,从而导致它们失败。我们建议您仅在表上没有并发写操作时才修改表属性。

您还可以使用Spark配置在首次提交delta表时设置delta.-prefixed属性。例如,要使用该属性初始化Delta表delta.appendOnly=true,请将Spark配置spark.databricks.delta.properties.defaults.appendOnly设置为true。例如:

SQL

%sql
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")

Scala

%spark
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

Python

%pyspark
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

元数据表

Delta Lake具有丰富的功能,可用于浏览元数据表。

它支持SHOW[PARTITIONS | COLUMNS]和DESCRIBE TABLE。查看

  • Databricks Runtime 7.0及更高版本

  • Databricks Runtime 6.x及以下版本

它还提供以下独特命令:

  • DESCRIBE DETAlL

  • DESCRIBE HISTORY

DESCRIBE DETAlL

提供有关Schema,分区,表大小等的信息。

DESCRIBE HISTORY

提供源信息,包括操作,用户等,以及每次写入表的操作指标。表格历史记录将保留30天。

数据侧栏提供了Delta表的详细表信息和历史的可视化视图。除了表Schema和示例数据外,还可以单击“历史记录”选项卡来查看与“DESCRIBE HISTORY”一起显示的表历史记录。