Delta Lake和Hudi是当前主流的数据湖产品,并且都支持了Spark的读写操作。本文为您介绍Spark如何处理Delta Lake和Hudi数据。

背景信息

通过以下方面介绍Spark处理Delta Lake和Hudi数据的操作:

准备工作

需要在项目中引入Delta Lake或Hudi相关的pom依赖。

例如,Delta Lake为1.0.0版本,Hudi为0.9.0版本,Scala使用2.12版本。使用Maven构建项目,Delta Lake或Hudi需要添加以下依赖:
  • Delta Lake依赖
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>1.0.0</version>
    </dependency>
  • Hudi依赖
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark_2.12</artifactId>
        <version>0.9.0</version>
    </dependency>

写操作

Delta Lake或Hudi写操作示例如下:
  • Delta Lake
    val data = spark.range(0, 5)
    data.write.format("delta").save("/tmp/delta-table")
  • Hudi
    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator
    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)

读操作

Delta Lake或Hudi读操作示例如下:
  • Delta Lake
    val df = spark.read.format("delta").load("/tmp/delta-table")
    df.show()
  • Hudi
    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath)
    tripsSnapshotDF.show()

更新操作

Delta Lake或Hudi读操作示例如下:
  • Delta Lake
    val data = spark.range(5, 10)
    data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
    df.show()
  • Hudi
    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator
    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)