Delta Lake CDC构建增量数仓

本文为您介绍Delta Lake CDC功能的相关参数、Schema和使用示例。

背景信息

CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Delta Lake CDC能够将Delta Lake表作为Source,直接获取变更的数据信息。

Delta Lake CDC是通过Change Data Feed(CDF)来实现的。CDF允许Delta Lake表能够追溯行级的变更信息。开启CDF后,Delta Lake将在必要的情况下持久化变更的数据信息,并写入到特定的表下的目录文件中。应用CDC,可以方便的构建增量数仓。

使用限制

仅EMR-3.41.0及后续版本(Delta Lake 0.6.1)和EMR-5.9.0及后续版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。

相关参数

SparkConf参数

参数

说明

spark.sql.externalTableValuedFunctions

EMR自定义Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL执行CDF查询时需要配置为table_changes。

spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled

取值如下:

  • false(默认值):CDF查询会强制要求指定的startingTimestamp或endingTimestamp是有效的,否则报错。

  • true:如果startingTimestamp无效,则直接返回空数据;如果endingTimestamp无效,则直接返回当前快照的数据。

说明

该参数仅在Delta 2.x版本生效。

CDC写参数

参数

说明

delta.enableChangeDataFeed

是否开启CDF,取值如下:

  • false(默认值):不开启CDF。

  • true:开启CDF。

CDC读参数

仅DataFram和Spark Streaming模式需要设置以下参数。

参数

说明

readChangeFeed

如果设置为true,则返回表的Change Data,且必须同时指定startingVersion或startingTimestamp任意一个参数搭配使用。

startingVersion

readChangeFeed为true时设置有效,表示从指定版本开始读取表的Change Data。

endingVersion

readChangeFeed为true时设置有效,表示读取表的Change Data的最后版本。

startingTimestamp

readChangeFeed为true时设置有效,表示从指定的时间戳开始读取表的Change Data。

endingTimestamp

readChangeFeed为true时设置有效,表示读取表的Change Data的最后时间戳。

Schema

Delta Lake CDF查询返回的Schema是在原表的Schema基础上追加以下三个额外字段:

  • _change_type:引起变更的操作,取值如下:

    • insert:标识数据为新插入的。

    • delete:标识数据为刚删除的。

    • update_preimage和update_postimage:标识数据为更新,分别记录其变更前的记录和变更后的记录。

  • _commit_version:变更对应的Delta表版本。

  • _commit_timestamp:变更对应的Delta表版本提交的时间。

使用示例

Spark SQL示例

重要

仅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL语法。

EMR Spark2上使用Spark SQL语法需要额外配置以下参数,代码如下所示。

spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes

SQL语法如下所示。

-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);

-- Update
UPDATE cdf_tbl set age = age + 1;

-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;

-- CDF Query
-- 查询从版本0开始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34为commit0的提交时间戳。


-- 查询版本4对应的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06为commit4的提交时间戳。

两次查询返回信息如下所示。

图 1. 查询1结果fig1

图 2. 查询2结果fig2

DataFrame示例

// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
  .option("delta.enableChangeDataFeed", "true") //首次写入delta数据时开启CDF,后续写入无需设置。
  .saveAsTable("cdf_table")

// CDF Query Using DataFrame
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可选。
  .table("cdf_table")

Spark Streaming示例

// Streaming CDF Query Using Dats
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可选。
  .table("cdf_table")