本文介绍Delta Lake作为数据源和数据接收端如何流式读写数据。

Delta Table作为数据源(Source)

spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1000)
  .load("/tmp/delta_table")

maxFilesPerTrigger指定了一个批次最多处理的文件数量,默认值为1000。

通常作为数据源的组件,数据一旦产生就会被下游消费,数据不会发生变化。但是Delta还兼顾了数据湖的角色,数据可能会被删除、更新,或者合并。目前Delta提供了两个选项来应对这种情况:
  • ignoreDeletes:设置该选项为true后,对分区的删除动作不会有任何影响。
  • ignoreChanges:设置该选项为true后,删除、更新或合并动作不会被特殊处理,但是这些动作产生的新文件会被当成新数据发送到下游。例如,某一个文件包含10000条数据,更新其中一条数据后,新文件有9999条旧数据和1条新数据。这9999条旧数据和1条新数据会被发送到下游。

Delta Table作为数据接收端(Sink)

  • Append模式:该模式是Spark Streaming的默认工作模式。
    df.writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "/tmp/delta_table/_checkpoints")
      .start("/tmp/delta_table")
  • Complete模式:在该模式下每一次batch的执行都会以全表覆盖的形式写目标表。例如,对于(id LONG, date DATE, name STRING, sales DOUBLE)这张表,您可以统计每个人的总销售额,将统计结果写入目标表,每个批次更新一次。
    spark.readStream
      .format("delta")
      .load("/tmp/delta_table")
      .select("name","sales")
      .groupBy("name")
      .agg(sum("sales"))
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints")
      .start("/tmp/delta_table_summary")