本文介绍Delta Lake作为数据源和数据接收端如何流式读写数据。
Delta Table作为数据源(Source)
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1000)
.load("/tmp/delta_table")
maxFilesPerTrigger
指定了一个批次最多处理的文件数量,默认值为1000。
通常作为数据源的组件,数据一旦产生就会被下游消费,数据不会发生变化。但是Delta还兼顾了数据湖的角色,数据可能会被删除、更新,或者合并。目前Delta提供了两个选项来应对这种情况:
ignoreDeletes:设置该选项为true后,对分区的删除动作不会有任何影响。
ignoreChanges:设置该选项为true后,删除、更新或合并动作不会被特殊处理,但是这些动作产生的新文件会被当成新数据发送到下游。例如,某一个文件包含10000条数据,更新其中一条数据后,新文件有9999条旧数据和1条新数据。这9999条旧数据和1条新数据会被发送到下游。
Delta Table作为数据接收端(Sink)
Append模式:该模式是Spark Streaming的默认工作模式。
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints") .start("/tmp/delta_table")
Complete模式:在该模式下每一次batch的执行都会以全表覆盖的形式写入目标表。例如,对于(id LONG, date DATE, name STRING, sales DOUBLE)这张表,您可以统计每个人的总销售额,将统计结果写入目标表,每个批次更新一次。
Spark Structured Streaming读写
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints") .start("/tmp/delta_table_summary")
Streaming SQL读写
create table targetTableName (key bigint, value bigint) using delta; create table sourceTableName (key bigint, value bigint) using delta; insert into sourceTableName values(1,238),(238,2388); CREATE SCAN stream_source_1 ON sourceTableName USING STREAM; CREATE STREAM test_delta_stream OPTIONS( checkpointLocation='/tmp/test_delta_stream/targetTableName' ) MERGE INTO targetTableName AS target USING ( select key, value from ( SELECT key, value, row_number() over(partition by key order by value) as rn FROM stream_source_1 ) where rn = 1 ) AS source ON target.key=source.value WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ;
文档内容是否对您有帮助?