Delta Lake和Hudi是当前主流的数据湖产品,并且都支持了Spark的读写操作。本文为您介绍Spark如何处理Delta Lake和Hudi数据。
背景信息
Delta Lake和Hudi的更多信息,请参见Delta Lake文档和Hudi文档。
准备工作
环境
需要在项目中引入Delta Lake或Hudi相关的pom依赖。
参数
Delta Lake参数
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
如果您集群的Spark是Spark3,则额外还需以下参数。
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Hudi参数
spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
如果您集群的Spark是Spark3,则额外还需以下参数。
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
Spark读写Delta
Spark SQL语法
详细示例如下。
-- 建表
create table delta_tbl (id int, name string) using delta;
-- 写入数据
insert into delta_tbl values (1, "a1"), (2, "a2");
-- 更新数据
update delta_tbl set name = 'a1_new' where id = 1;
-- 删除数据
delete from delta_tbl where id = 1;
-- 查询数据
select * from delta_tbl;
Spark Dataset语法
详细示例如下。
// 写数据
val df = Seq((1, "a1"), (2, "a2")).toDF("id", name)
df.write.format("delta").save("/tmp/delta_tbl")
// 读数据
spark.read.format("delta").load("/tmp/delta_tbl")
Spark读写Hudi
Spark SQL语法
详细示例如下。
-- 建表
create table hudi_tbl (
id bigint,
name string,
price double,
ts long
) using hudi
tblproperties (
primaryKey="id",
preCombineField="ts"
);
-- 写入数据
insert into hudi_tbl values (1, 'a1', 10.0, 1000), (2, 'a2', 11.0, 1000);
-- 更新数据
update hudi_tbl set name = 'a1_new' where id = 1;
-- 删除数据
delete from hudi_tbl where id = 1;
-- 查询数据
select * from hudi_tbl;
Spark Dataset语法
详细示例如下。
// 写数据
import org.apache.hudi.DataSourceWriteOptions._
val df = Seq((1, "a1", 10.0, 1000), (2, "a2", 11.0, 1000)).toDF("id", "name", "price", "ts")
df.write.format("hudi").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "").
option("hoodie.table.name", "hudi_tbl").
mode("append").
save("/tmp/hudi_tbl")
// 读数据
spark.read.format("hudi").load("/tmp/hudi_tbl")
文档内容是否对您有帮助?