本文介绍Delta Lake一些基础的使用示例。
建表并写入数据
- Scala
val data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta_table")
- SQL
CREATE TABLE delta_table (id INT) USING delta LOCATION "/tmp/delta_table"; INSERT INTO delta_table VALUES 0,1,2,3,4;
读表
- Scala
val df = spark.read.format("delta").load("/tmp/delta_table") df.show()
- SQL
SELECT * FROM delta_table;
覆盖写数据
- Scala
val data1 = spark.range(5, 10) data1.write.format("delta").mode("overwrite").save("/tmp/delta_table") df.show()
- SQL
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9; SELECT * FROM delta_table;
DELETE/UPDATE/MERGE
- Scala
import io.delta.tables._ import org.apache.spark.sql.functions._ val deltaTable = DeltaTable.forPath("/tmp/delta_table") // Update every even value by adding 100 to it deltaTable.update( condition = expr("id % 2 == 0"), set = Map("id" -> expr("id + 100"))) deltaTable.toDF.show() // Delete every even value deltaTable.delete(condition = expr("id % 2 == 0")) deltaTable.toDF.show() // Upsert (merge) new data val newData = spark.range(0, 10).toDF deltaTable.as("oldData") .merge( newData.as("newData"), "oldData.id = newData.id") .whenMatched .updateExpr(Map("id" -> "newData.id + 100")) .whenNotMatched .insertExpr(Map("id" -> "newData.id")) .execute() deltaTable.toDF.show()
- SQL
UPDATE delta_table SET id = id + 100 WHERE mod(id,2) = 0; SELECT * FROM delta_table; DELETE FROM delta_table WHERE mod(id,2) = 0; SELECT * FROM delta_table; CREATE TABLE newData(id INT) USING delta LOCATION "/tmp/newData"; INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9; MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *; SELECT * FROM delta_table;
流式读
- Scala
val stream1 = spark.readStream.format("delta").load("/tmp/delta_table2").writeStream.format("console").start()
- SQL
CREATE SCAN stream_delta_table on delta_table USING STREAM; CREATE STREAM job INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
流式写
- Scala
val streamingDf = spark.readStream.format("kafka") .option(kafka.bootstrap.servers="${BOOTSTRAP_SERVERS}", subscribe ="${TOPIC_NAME}").load() val stream = streamingDf.select(s"CAST(value AS STRING)" as "id") .writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta_table")
- SQL
CREATE TABLE IF NOT EXISTS kafka_topic USING kafka OPTIONS ( kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}", subscribe = "${TOPIC_NAME}" ); CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM; CREATE STREAM job OPTIONS( checkpointLocation='/tmp/' ) INSERT INTO delta_table SELECT CAST(value AS STRING) AS id FROM stream_kafka_topic;
在文档使用中是否遇到以下问题
更多建议
匿名提交