Spark处理Delta Lake和Hudi数据

Delta Lake和Hudi是当前主流的数据湖产品,并且都支持了Spark的读写操作。本文为您介绍Spark如何处理Delta Lake和Hudi数据。

背景信息

Delta Lake和Hudi的更多信息,请参见Delta Lake文档Hudi文档

准备工作

环境

需要在项目中引入Delta Lake或Hudi相关的pom依赖。

参数

  • Delta Lake参数

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension

    如果您集群的Spark是Spark3,则额外还需以下参数。

    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
  • Hudi参数

    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension

    如果您集群的Spark是Spark3,则额外还需以下参数。

    spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog

Spark读写Delta

Spark SQL语法

详细示例如下。

-- 建表
create table delta_tbl (id int, name string) using delta;

-- 写入数据
insert into delta_tbl values (1, "a1"), (2, "a2");

-- 更新数据
update delta_tbl set name = 'a1_new' where id = 1;

-- 删除数据
delete from delta_tbl where id = 1;

-- 查询数据
select * from delta_tbl;

Spark Dataset语法

详细示例如下。

// 写数据
val df = Seq((1, "a1"), (2, "a2")).toDF("id", name)
df.write.format("delta").save("/tmp/delta_tbl")

// 读数据
spark.read.format("delta").load("/tmp/delta_tbl")

Spark读写Hudi

Spark SQL语法

详细示例如下。

-- 建表
create table hudi_tbl (
  id bigint,
  name string,
  price double,
  ts long
) using hudi
tblproperties (
  primaryKey="id",
  preCombineField="ts"
);

-- 写入数据
insert into hudi_tbl values (1, 'a1', 10.0, 1000), (2, 'a2', 11.0, 1000);

-- 更新数据
update hudi_tbl set name = 'a1_new' where id = 1;

-- 删除数据
delete from hudi_tbl where id = 1;

-- 查询数据
select * from hudi_tbl;

Spark Dataset语法

详细示例如下。

// 写数据
import org.apache.hudi.DataSourceWriteOptions._

val df = Seq((1, "a1", 10.0, 1000), (2, "a2", 11.0, 1000)).toDF("id", "name", "price", "ts")

df.write.format("hudi").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "").
option("hoodie.table.name", "hudi_tbl").
mode("append").
save("/tmp/hudi_tbl")

// 读数据
spark.read.format("hudi").load("/tmp/hudi_tbl")