本文为您介绍Delta Lake CDC功能的相关参数、Schema和使用示例。
背景信息
CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Delta Lake CDC能够将Delta Lake表作为Source,直接获取变更的数据信息。
Delta Lake CDC是通过Change Data Feed(CDF)来实现的。CDF允许Delta Lake表能够追溯行级的变更信息。开启CDF后,Delta Lake将在必要的情况下持久化变更的数据信息,并写入到特定的表下的目录文件中。应用CDC,可以方便的构建增量数仓。
使用限制
仅EMR-3.41.0及后续版本(Delta Lake 0.6.1)和EMR-5.9.0及后续版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。
相关参数
SparkConf参数
参数 | 说明 |
---|---|
spark.sql.externalTableValuedFunctions | EMR自定义Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL执行CDF查询时需要配置为table_changes。 |
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled | 取值如下:
说明 该参数仅在Delta 2.x版本生效。 |
CDC写参数
参数 | 说明 |
---|---|
delta.enableChangeDataFeed | 是否开启CDF,取值如下:
|
CDC读参数
仅DataFram和Spark Streaming模式需要设置以下参数。
参数 | 说明 |
---|---|
readChangeFeed | 如果设置为true,则返回表的Change Data,且必须同时指定startingVersion或startingTimestamp任意一个参数搭配使用。 |
startingVersion | readChangeFeed为true时设置有效,表示从指定版本开始读取表的Change Data。 |
endingVersion | readChangeFeed为true时设置有效,表示读取表的Change Data的最后版本。 |
startingTimestamp | readChangeFeed为true时设置有效,表示从指定的时间戳开始读取表的Change Data。 |
endingTimestamp | readChangeFeed为true时设置有效,表示读取表的Change Data的最后时间戳。 |
Schema
Delta Lake CDF查询返回的Schema是在原表的Schema基础上追加以下三个额外字段:
- _change_type:引起变更的操作,取值如下:
- insert:标识数据为新插入的。
- delete:标识数据为刚删除的。
- update_preimage和update_postimage:标识数据为更新,分别记录其变更前的记录和变更后的记录。
- _commit_version:变更对应的Delta表版本。
- _commit_timestamp:变更对应的Delta表版本提交的时间。
使用示例
Spark SQL示例
重要 仅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL语法。
EMR Spark2上使用Spark SQL语法需要额外配置以下参数,代码如下所示。
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes
SQL语法如下所示。
-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");
-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);
-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);
-- Update
UPDATE cdf_tbl set age = age + 1;
-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);
MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;
-- CDF Query
-- 查询从版本0开始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34为commit0的提交时间戳。
-- 查询版本4对应的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06为commit4的提交时间戳。
两次查询返回信息如下所示。
Streaming SQL示例
EMR Spark支持Spark Streaming SQL,您可以使用SQL语法开发Streaming的任务。Delta Lake CDF也集成了Streaming SQL,可以更方便的实现增量更新的业务场景。
SQL语法如下所示。
-- 通过设置readChangeFeed='true'来创建基于Delta表的Streaming Source。
CREATE SCAN stream_cdf_source ON delta_table USING STERAM
OPTIONS(readChangeFeed='true', startingVersion='0');
-- 创建并执行Streaming任务。
CREATE STREAM job_name_x
OPTIONS (checkpointLocation="/tmp/delta/job_name_x")
INSERT INTO target_table SELECT * FROM stream_cdf_source;
创建SCAN时可以选择指定startingVersion,创建STREAM时需要指定checkpoint路径来保存流式的offset信息,两者都会作用于流式读取的起始点。
- 不指定startingVersion:
- checkpointLocation为空:返回当前Delta Lake的快照数据。
- checkpointLocation有历史offset:以checkpoint的offset为起始,startingVersion不生效。
- 指定startingVersion:
- checkpointLocation为空:以startingVersion为起始。
- checkpointLocation有历史offset:以checkpoint的offset为起始,startingVersion不生效。
DataFrame示例
// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
.option("delta.enableChangeDataFeed", "true") //首次写入delta数据时开启CDF,后续写入无需设置。
.saveAsTable("cdf_table")
// CDF Query Using DataFrame
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可选。
.table("cdf_table")
Spark Streaming示例
// Streaming CDF Query Using Dats
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可选。
.table("cdf_table")