本文介绍Delta Lake流式读写数据的一些使用示例。

Delta Table 作为数据源(Source)

Scala

spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1000)
  .load("/tmp/delta_table")

作为数据源的组件一般数据一旦产生就会被下游消费,数据不会发生变化。但是Delta还兼顾了数据湖的角色,数据可能会被删除、更新,或者合并。目前Delta提供了两个选项来应对这种情况:

  • ignoreDeletes:设置该选项为true后,一切**对分区的删除**动作不会有任何影响。注意,非分区列的删除不包含在此。
  • ignoreChanges:设置该选项为true后,删除、更新或合并动作不会被特殊处理,但是这些动作产生的新文件会被当成新数据发送到下游。例如,某一个文件包含10000条数据,更新其中一条数据后,新文件有9999条旧数据和1条新数据。这9999条旧数据和1条新数据会被发送到下游。
    说明
    • maxFilesPerTrigger指定了一个批次最多处理的文件数量,默认值为1000。
    • ignoreDeletes的行为比较符合预期,但仅仅局限于对分区的删除。ignoreChanges的行为可能有些奇怪,这主要是由于Delta的工作原理决定的。当设置ignoreChanges可能会有重复数据发送到下游,因此下游应当注意处理这些重复数据。

Delta Table 作为数据接收端(Sink)

  • Append模式:该模式是Spark Streaming的默认工作模式。

    Scala

    df.writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "/tmp/delta_table/_checkpoints)
      .start("/tmp/delta_table")
  • Complete模式:在该模式下每一次batch的执行都会以全表覆盖的形式写目标表。例如,对于 (id LONG, date DATE, name STRING, sales DOUBLE)这张表,我们统计每个人的总销售额,将统计结果写入目标表,每个批次更新一次。

    Scala

    spark.readStream
      .format("delta")
      .load("/tmp/delta_table")
      .select("name","sales")
      .groupBy("name")
      .agg(sum("sales"))
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints")
      .start("/tmp/delta_table_summary")