全部产品

表流读写

说明

详细内容请参考Databricks官网文章:表流读写

有关演示这些功能的Databricks笔记本,请参阅入门笔记本二

Delta Lake通过readStream和writeStream与Spark结构化流式处理深度集成。Delta Lake克服了许多流式处理系统和文件相关的常见限制,例如:

  • 合并低延迟引入产生的小文件

  • 保持多个流(或并发批处理作业)执行“仅一次”处理

  • 使用文件作为流源时,可以有效地发现哪些文件是新文件

Delta表作为流源

当您将Delta表加载为流源并在流式查询中使用它时,该查询将处理表中存在的所有数据以及流启动后到达的所有新数据。

您可以将路径和表都作为流加载。

Scala

%spark
spark.readStream.format("delta").load("/mnt/delta/events")

Scala

%spark
spark.readStream.format("delta").table("events")

你也可以执行以下操作:

  • 通过设置maxFilesPerTrigger选项,控制Delta Lake提供给流的任何微批处理的最大大小。这指定了每个触发器中要考虑的新文件的最大数量。默认值为1000。

  • 通过设置maxBytesPerTrigger选项来限制每个微批处理的数据量的速率。这将设置一个“软最大值”,这意味着批处理大约此数量的数据,并可能处理超过该限制的数据量。如果你使用Trigger。如果Trigger.Once用于流式传输,则忽略此选项。如果将此选项与maxFilesPerTrigger结合使用,则微批处理将处理数据,,直到达到maxFilesPerTrigger或maxBytesPerTrigger限制。

忽略更新和删除

结构化流式处理不处理非追加的输入,如果在用作源的表上进行了任何修改,则引发异常。有两种主要策略可以处理无法自动向下游传播的更改:

  • 您可以删除输出和检查点,并从头开始重新启动流。

  • 您可以设置以下两个选项之一:

    • ignoreDeletes:忽略在分区边界删除数据的事务。

    • ignoreChanges:如果由于更新、合并、删除(在分区内)或覆盖等数据更改操作而必须重写源表中的文件,则重新处理更新。未更改的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,则源表的删除或更新不会中断流。

示例

例如,假设您有一个表user_events,其中包含date、user_email和action列,这些列是按日期分区的。由于GDPR的原因,您需要从user_events表中删除数据。

当您在分区边界处执行删除(即,WHERE在分区列上)操作时,文件已经按值分段,因此删除操作只是从元数据中删除这些文件。因此,如果只想从某些分区删除数据,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

但是,如果您必须基于user_email删除数据,则需要使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

如果使用update语句更新user_email,则会重写包含user_email相关的文件。使用ignoreChanges时,新记录将与位于同一文件中的所有其他未更改记录一起传播到下游。逻辑应该能够处理这些传入的重复记录。

指定初始位置

说明

该功能在Databricks Runtime 7.3 LTS及更高版本上可用。

您可以使用以下选项来指定Delta Lake流式处理源的起点,而无需处理整个表。

  • startingVersion:Delta Lake版本开始。从该版本(包括该版本)开始的所有表更改都将由流式处理源读取。您可以从命令“ DESCRIBE HISTORY events”输出的version列中获取提交版本。

    • 要仅返回最新更改,请在Databricks Runtime 7.4及更高版本中指定latest。

  • startingTimestamp:开始的时间戳。流式处理源将读取在时间戳(包括时间戳)或之后提交的所有表更改。可以是以下任何一项:

    • '2018-10-18T22:15:12.013Z',即可以转换为时间戳的字符串

    • cast('2018-10-18 13:36:32 CEST' as timestamp)

    • '2018-10-18',即日期字符串

    • 本身就是时间戳或可强制转换为时间的任何其他表达式,如:current_timestamp() - interval 12 hour sdate_sub(current_date(), 1)

您不能同时设置两个选项。您只需要使用其中之一个选项即可。它们仅在启动新的流查询时才生效。如果流式处理查询已启动并且进度已记录在其检查点中,则将忽略这些选项。

警告

尽管可以从指定的版本或时间戳启动流式处理源,但是流式处理源的架构始终是Delta表的最新架构。您必须确保在指定的版本或时间戳记之后,不对Delta表进行不兼容的架构更改。否则,当使用不正确的架构读取数据时,流式传输源可能会返回不正确的结果。

示例

例如,假设您有一个表格User_events。如果要阅读版本5之后的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

如果您想阅读自2018年10月18日以来的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

用作接收器的Delta表

您也可以使用结构化流将数据写入Delta表。事务日志使Delta Lake能够保证"仅一次"处理,即使针对该表同时运行其他流或批查询。

追加模式

默认情况下,流以追加模式运行,这会将新记录添加到表中。

您可以使用路径方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

Scala

%spark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

或表格方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

Scala

%spark
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

完整模式

您还可以使用结构化流式处理技术将整个批替换为每个批。一个示例用例是使用聚合来计算摘要:

Scala

%spark
spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")
从结构化的输入流中读取数据,经过处理后结构化流输出到delta文件

前面的示例不断更新包含客户的事件总数的表。

对于延迟要求更宽松的应用程序,您可以使用一次性触发器来节省计算资源。使用这些更新按给定的时间表更新汇总聚合表,仅处理自上次更新以来已到达的新数据。