文档

Hudi CDC构建增量数仓

更新时间:

本文为您介绍Hudi CDC功能的相关参数和使用示例。

背景信息

CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Hudi CDC能够将Hudi表作为Source,直接获取变更的数据信息。

使用限制

仅EMR-3.45.0及后续版本和EMR-5.11.0及后续版本的集群,并且Hudi版本为0.12.2时,支持使用Hudi CDC功能。

相关参数

CDC写参数

参数

说明

hoodie.table.cdc.enabled

是否开启CDC,取值如下:

  • true:开启CDC。

  • false(默认值):不开启CDC。

hoodie.table.cdc.supplemental.logging.mode

CDC文件存储模式,共有三种等级:

  • op_key_only:存储记录的主键和操作类型。

  • data_before:存储记录的主键、操作类型、记录修改前的值。

  • data_before_after(默认值):存储记录的主键、操作类型、记录修改前的值和修改后的值。

CDC读参数

参数

说明

hoodie.datasource.query.type

查询类型,使用CDC功能需配置为incremental

默认值为snapshot。

hoodie.datasource.query.incremental.format

增量查询类型,使用CDC功能需配置为cdc

默认值为latest_state。

hoodie.datasource.read.begin.instanttime

增量查询起始时间。

hoodie.datasource.read.end.instanttime

增量查询截止时间,可选参数。

使用示例

Spark SQL

  1. 在Spark服务配置页面的spark-defaults.conf页签中,新增配置项参数spark.serializer,参数值为org.apache.spark.serializer.KryoSerializer。新增配置项的具体操作,请参见添加配置项

  2. 执行以下命令,新建表。

    create table hudi_cdc_test (
      id bigint,
      name string,
      ts bigint
    ) using hudi
    tblproperties (
      type = 'cow',
      primaryKey = 'id',
      preCombineField = 'ts',
      'hoodie.table.cdc.enabled' = 'true',
      'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after'
    );
  3. 执行以下命令,向表中写入数据并查看表信息。

    insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001);
    select * from hudi_cdc_test;

    返回信息如下。

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    1    a1    1000
    20230129220605215    20230129220605215_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    2    a2    1001
  4. .hoodie目录中获取上一次commit的时间戳,进行CDC查询。

    1. 获取上一次commit的时间戳。

      -rw-r--r--  1 zxy  staff   1.2K  1 29 22:06 20230129220605215.commit
      -rw-r--r--  1 zxy  staff     0B  1 29 22:06 20230129220605215.commit.requested
      -rw-r--r--  1 zxy  staff   798B  1 29 22:06 20230129220605215.inflight
    2. 执行以下命令,进行CDC查询。

      由于查询区间为左开右闭,所以将时间戳减1作为起始时间。

      select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");

      返回信息如下。

      i    20230129220605215    NULL    {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1}
      i    20230129220605215    NULL    {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
  5. 执行以下命令,再次写入数据并查看表信息。

    insert into hudi_cdc_test values (2, 'a2', 1002);
    select * from hudi_cdc_test;

    返回信息如下。

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    1    a1    1000
    20230129221304930    20230129221304930_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    2    a2    1002
  6. 参见步骤3,获取上一次commit的时间戳并减1,进行CDC查询。

    例如,获取到的时间戳为20230129221304930。执行以下命令,进行CDC查询。

    select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");

    返回信息如下。

    u    20230129221304930    {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}

Dataframe

  1. 准备工作。

    import org.apache.hudi.common.table.HoodieTableMetaClient
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase}
    
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .withExtensions(new HoodieSparkSessionExtension)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._
    
    val basePath = "/tmp/test/hudi_cdc_test"
    
    val writeOpts = Map(
      "hoodie.table.name" -> "hudi_cdc_test",
      "hoodie.datasource.write.recordkey.field" -> "id",
      "hoodie.datasource.write.precombine.field" -> "ts",
      "hoodie.table.cdc.enabled" -> "true",
      "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only"
    )
    
    val readOpts = Map(
      "hoodie.datasource.query.type" -> "incremental",
      "hoodie.datasource.query.incremental.format" -> "cdc"
    )
  2. 使用df1写入数据。

    val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts")
    df1.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df1.show(false)

    返回信息如下。

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |1  |a1  |1000|
    |2  |a2  |1001|
    +---+----+----+
  3. 读取cdc1的数据。

    val metaClient = HoodieTableMetaClient.builder()
      .setBasePath(basePath)
      .setConf(spark.sessionState.newHadoopConf())
      .build()
    
    val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc1 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString)
      .load(basePath)
    cdc1.show(false)

    返回信息如下。

    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before|after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |i  |20230128030951890|null  |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}|
    |i  |20230128030951890|null  |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}|
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  4. 使用df2写入数据。

    val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts")
    df2.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df2.show(false)

    返回信息如下。

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |2  |a2  |1002|
    +---+----+----+
  5. 读取cdc2的数据。

    val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc2 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString)
      .load(basePath)
    cdc2.show(false)

    返回信息如下。

    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before                                                                                                                                                                                                                                                                                    |after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |u  |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}|
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+