本文为您介绍Delta Lake CDC功能的相关参数、Schema和使用示例。
背景信息
CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Delta Lake CDC能够将Delta Lake表作为Source,直接获取变更的数据信息。
Delta Lake CDC是通过Change Data Feed(CDF)来实现的。CDF允许Delta Lake表能够追溯行级的变更信息。开启CDF后,Delta Lake将在必要的情况下持久化变更的数据信息,并写入到特定的表下的目录文件中。应用CDC,可以方便的构建增量数仓。
使用限制
仅EMR-3.41.0及后续版本(Delta Lake 0.6.1)和EMR-5.9.0及后续版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。
相关参数
SparkConf参数
参数 | 说明 |
spark.sql.externalTableValuedFunctions | EMR自定义Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL执行CDF查询时需要配置为table_changes。 |
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled | 取值如下:
说明 该参数仅在Delta 2.x版本生效。 |
CDC写参数
参数 | 说明 |
delta.enableChangeDataFeed | 是否开启CDF,取值如下:
|
CDC读参数
仅DataFram和Spark Streaming模式需要设置以下参数。
参数 | 说明 |
readChangeFeed | 如果设置为true,则返回表的Change Data,且必须同时指定startingVersion或startingTimestamp任意一个参数搭配使用。 |
startingVersion | readChangeFeed为true时设置有效,表示从指定版本开始读取表的Change Data。 |
endingVersion | readChangeFeed为true时设置有效,表示读取表的Change Data的最后版本。 |
startingTimestamp | readChangeFeed为true时设置有效,表示从指定的时间戳开始读取表的Change Data。 |
endingTimestamp | readChangeFeed为true时设置有效,表示读取表的Change Data的最后时间戳。 |
Schema
Delta Lake CDF查询返回的Schema是在原表的Schema基础上追加以下三个额外字段:
_change_type:引起变更的操作,取值如下:
insert:标识数据为新插入的。
delete:标识数据为刚删除的。
update_preimage和update_postimage:标识数据为更新,分别记录其变更前的记录和变更后的记录。
_commit_version:变更对应的Delta表版本。
_commit_timestamp:变更对应的Delta表版本提交的时间。
使用示例
Spark SQL示例
仅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL语法。
EMR Spark2上使用Spark SQL语法需要额外配置以下参数,代码如下所示。
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes
SQL语法如下所示。
-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");
-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);
-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);
-- Update
UPDATE cdf_tbl set age = age + 1;
-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);
MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;
-- CDF Query
-- 查询从版本0开始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34为commit0的提交时间戳。
-- 查询版本4对应的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06为commit4的提交时间戳。
两次查询返回信息如下所示。
DataFrame示例
// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
.option("delta.enableChangeDataFeed", "true") //首次写入delta数据时开启CDF,后续写入无需设置。
.saveAsTable("cdf_table")
// CDF Query Using DataFrame
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可选。
.table("cdf_table")
Spark Streaming示例
// Streaming CDF Query Using Dats
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可选。
.table("cdf_table")