Hudi与Spark SQL集成

E-MapReduce的Hudi 0.8.0版本支持Spark SQL对Hudi进行读写操作,可以极大的简化Hudi的使用成本。本文为您介绍如何通过Spark SQL对Hudi进行读写操作。

使用限制

EMR-3.36.0及后续版本和EMR-5.2.0及后续版本,支持Spark SQL对Hudi进行读写操作。

启动方式

  • Spark2和Spark3 hudi0.11以下版本
    spark-sql \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark3 hudi0.11及以上版本
    spark-sql \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

使用示例

  • 建表
    create table h0 (
      id bigint,
      name string,
      price double,
      ts long
    ) using hudi
    tblproperties (
      primaryKey="id",
      preCombineField="ts"
    );
  • 查看表详情
    desc formatted h0;
    返回信息如下所示。
    _hoodie_commit_time     string
    _hoodie_commit_seqno    string
    _hoodie_record_key      string
    _hoodie_partition_path    string
    _hoodie_file_name       string
    id                      bigint
    name                    string
    price                   double
    ts                      bigint
    说明 _hoodie_commit_time_hoodie_commit_seqno_hoodie_record_key_hoodie_partition_path_hoodie_file_name为Hudi默认添加的辅助字段。
  • 数据操作
    -- insert
    insert into h0 values (1, 'a1', 10, 1000), (2, 'a2', 11, 1000);
    
    -- update
    update h0 set name = 'a1_new' where id = 1;
    
    -- delete
    delete from h0 where id = 1;
  • 查询
    • 示例1
      select id, name, price, ts from h0;
      查询结果如下所示。
      2    a2    11.0    1000
    • 示例2
      select * from h0;
      查询结果如下所示。
      4.820221130150621338    20221130150621338_0_1    id:2        40d6507e-0579-42ce-a10f-c5e07a3981e5-0_0-29-2007_2022113015062****.parquet    2    a2    11.0    1000
      说明 由于当前为非分区表,所以_hoodie_partition_path为空。前四个字段为Hudi默认添加的辅助字段的值。