本文介绍Delta Lake一些基础的使用示例。

建表并写入数据

  • Scala
    val data = spark.range(0, 5)
    data.write.format("delta").save("/tmp/delta_table")
  • SQL
    CREATE TABLE delta_table (id INT) USING delta LOCATION "/tmp/delta_table";
    INSERT INTO delta_table VALUES 0,1,2,3,4;

读表

  • Scala
    val df = spark.read.format("delta").load("/tmp/delta_table")
    df.show()
  • SQL
    SELECT * FROM delta_table;

覆盖写数据

  • Scala
    val data1 = spark.range(5, 10)
    data1.write.format("delta").mode("overwrite").save("/tmp/delta_table")
    df.show()
  • SQL
    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
    SELECT * FROM delta_table;

DELETE/UPDATE/MERGE

  • Scala
    import io.delta.tables._
    import org.apache.spark.sql.functions._
    
    val deltaTable = DeltaTable.forPath("/tmp/delta_table")
    
    // Update every even value by adding 100 to it
    deltaTable.update(
      condition = expr("id % 2 == 0"),
      set = Map("id" -> expr("id + 100")))
    
    deltaTable.toDF.show()
    
    // Delete every even value
    deltaTable.delete(condition = expr("id % 2 == 0"))
    
    deltaTable.toDF.show()
    
    // Upsert (merge) new data
    val newData = spark.range(0, 10).toDF
    
    deltaTable.as("oldData")
      .merge(
        newData.as("newData"),
        "oldData.id = newData.id")
      .whenMatched
      .updateExpr(Map("id" -> "newData.id + 100"))
      .whenNotMatched
      .insertExpr(Map("id" -> "newData.id"))
      .execute()
    
    deltaTable.toDF.show()
  • SQL
    UPDATE delta_table SET id = id + 100 WHERE mod(id,2) = 0;
    SELECT * FROM delta_table;
    
    DELETE FROM delta_table WHERE mod(id,2) = 0;
    SELECT * FROM delta_table;
    
    CREATE TABLE newData(id INT) USING delta LOCATION "/tmp/newData";
    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
    
    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
    
    SELECT * FROM delta_table;

流式读

  • Scala
    val stream1 = spark.readStream.format("delta").load("/tmp/delta_table2").writeStream.format("console").start()
  • SQL
    CREATE SCAN stream_delta_table on delta_table USING STREAM;
    
    CREATE STREAM job
    INSERT INTO stream_debug_table
    SELECT *
    FROM stream_delta_table;

流式写

  • Scala
    val streamingDf = spark.readStream.format("kafka")
      .option(kafka.bootstrap.servers="${BOOTSTRAP_SERVERS}", subscribe ="${TOPIC_NAME}").load()
    val stream = streamingDf.select(s"CAST(value AS STRING)" as "id")
      .writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta_table")
  • SQL
    CREATE TABLE IF NOT EXISTS kafka_topic
    USING kafka
    OPTIONS (
    kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
    subscribe = "${TOPIC_NAME}"
    );
    
    CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM;
    
    CREATE STREAM job
    OPTIONS(
    checkpointLocation='/tmp/'
    )
    INSERT INTO delta_table
    SELECT CAST(value AS STRING) AS id FROM stream_kafka_topic;