本文介绍Delta Lake作为数据源和数据接收端如何流式读写数据。

Delta Table作为数据源(Source)

spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1000)
  .load("/tmp/delta_table")

maxFilesPerTrigger指定了一个批次最多处理的文件数量,默认值为1000。

通常作为数据源的组件,数据一旦产生就会被下游消费,数据不会发生变化。但是Delta还兼顾了数据湖的角色,数据可能会被删除、更新,或者合并。目前Delta提供了两个选项来应对这种情况:
  • ignoreDeletes:设置该选项为true后,对分区的删除动作不会有任何影响。
  • ignoreChanges:设置该选项为true后,删除、更新或合并动作不会被特殊处理,但是这些动作产生的新文件会被当成新数据发送到下游。例如,某一个文件包含10000条数据,更新其中一条数据后,新文件有9999条旧数据和1条新数据。这9999条旧数据和1条新数据会被发送到下游。

Delta Table作为数据接收端(Sink)

  • Append模式:该模式是Spark Streaming的默认工作模式。
    df.writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "/tmp/delta_table/_checkpoints")
      .start("/tmp/delta_table")
  • Complete模式:在该模式下每一次batch的执行都会以全表覆盖的形式写目标表。例如,对于(id LONG, date DATE, name STRING, sales DOUBLE)这张表,您可以统计每个人的总销售额,将统计结果写入目标表,每个批次更新一次。
    • Spark Structured Streaming读写
      spark.readStream
        .format("delta")
        .load("/tmp/delta_table")
        .select("name","sales")
        .groupBy("name")
        .agg(sum("sales"))
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints")
        .start("/tmp/delta_table_summary")
    • Streaming SQL读写
      create table targetTableName
              (key bigint, value bigint)
              using delta;
      
      create table sourceTableName
              (key bigint, value bigint)
              using delta;
      
      insert into sourceTableName values(1,238),(238,2388);
      
      CREATE SCAN stream_source_1 ON sourceTableName USING STREAM;
      
      CREATE STREAM test_delta_stream
              OPTIONS(
                checkpointLocation='/tmp/test_delta_stream/targetTableName'
              )
              MERGE INTO targetTableName AS target
              USING (
              select key, value
              from (
               SELECT key, value, row_number() over(partition by key order by value) as rn
               FROM stream_source_1 ) where rn = 1
              ) AS source
              ON target.key=source.value
              WHEN MATCHED THEN UPDATE SET *
              WHEN NOT MATCHED THEN INSERT *
       ;