本文为您介绍如何在E-MapReduce Hudi中写数据以及查询数据。
写数据
环境配置
EMR-3.32.0以及后续版本中,已经将Hudi相关依赖集成到各个开源组件中,包括Spark、Hive和Presto,因此运行时不需要引入额外的Hudi依赖,只需要在pom文件中添加Hudi依赖即可。不同的EMR版本使用的Hudi版本不同,详细信息请参见下表。
| Hudi版本 | EMR版本 | 
| 0.6.0 | 
 | 
| 0.8.0 | 
 | 
| 0.9.0 | 
 | 
| 0.10.0 | 
 | 
| 0.11.0 | EMR 3.42.0,EMR 5.8.0 | 
| 0.12.0 | 
 | 
| 0.12.2 | 
 | 
| 0.13.1 | 
 | 
<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <!-- for spark3 <artifactId>hudi-spark_2.12</artifactId> -->
   <version>${hudi_version}</version>
  <scope>provided</scope>
</dependency>Insert和Update
示例如下。
 val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("hudi test")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
import spark.implicits._
    val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
      .toDF("id", "name", "price", "version", "dt")
    df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
      .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
      .option(META_SYNC_ENABLED_OPT_KEY, "true")
      .option(HIVE_USE_JDBC_OPT_KEY, "false")
      .option(HIVE_DATABASE_OPT_KEY, "default")
      .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
      .option(INSERT_PARALLELISM, "8")
      .option(UPSERT_PARALLELISM, "8")
      .mode(Overwrite)
      .save("/tmp/hudi/h0")Delete
示例如下。
df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(DELETE_PARALLELISM, "8")
      .mode(Append)
      .save("/tmp/hudi/h0")查询数据
EMR引擎环境中已集成Hudi相关的软件包,您无需在Spark、Presto和Hive查询引擎中额外引入相关依赖。
Hive和Presto查询Hudi表,需要在写入阶段开启元数据同步功能,即设置META_SYNC_ENABLED_OPT_KEY为true。
对于社区版Hudi,COW和MOR表需要设置hive.input.format为org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat。EMR版本对于COW类型表,可以不用设置input format,支持自动适配Hudi的input format功能。