本文为您介绍Delta Lake CDC功能的相关参数、Schema和使用示例。

背景信息

CDC(Change Data Capture)定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。Delta Lake CDC能够将Delta Lake表作为Source,直接获取变更的数据信息。

Delta Lake CDC是通过Change Data Feed(CDF)来实现的。CDF允许Delta Lake表能够追溯行级的变更信息。开启CDF后,Delta Lake将在必要的情况下持久化变更的数据信息,并写入到特定的表下的目录文件中。应用CDC,可以方便的构建增量数仓。

使用限制

仅EMR-3.41.0及后续版本(Delta Lake 0.6.1)和EMR-5.9.0及后续版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。

相关参数

SparkConf参数

参数说明
spark.sql.externalTableValuedFunctionsEMR自定义Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL执行CDF查询时需要配置为table_changes。
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled取值如下:
  • false(默认值):CDF查询会强制要求指定的startingTimestamp或endingTimestamp是有效的,否则报错。
  • true:如果startingTimestamp无效,则直接返回空数据;如果endingTimestamp无效,则直接返回当前快照的数据。
说明 该参数仅在Delta 2.x版本生效。

CDC写参数

参数说明
delta.enableChangeDataFeed是否开启CDF,取值如下:
  • false(默认值):不开启CDF。
  • true:开启CDF。

CDC读参数

仅DataFram和Spark Streaming模式需要设置以下参数。

参数说明
readChangeFeed如果设置为true,则返回表的Change Data,且必须同时指定startingVersion或startingTimestamp任意一个参数搭配使用。
startingVersionreadChangeFeed为true时设置有效,表示从指定版本开始读取表的Change Data。
endingVersionreadChangeFeed为true时设置有效,表示读取表的Change Data的最后版本。
startingTimestampreadChangeFeed为true时设置有效,表示从指定的时间戳开始读取表的Change Data。
endingTimestampreadChangeFeed为true时设置有效,表示读取表的Change Data的最后时间戳。

Schema

Delta Lake CDF查询返回的Schema是在原表的Schema基础上追加以下三个额外字段:
  • _change_type:引起变更的操作,取值如下:
    • insert:标识数据为新插入的。
    • delete:标识数据为刚删除的。
    • update_preimage和update_postimage:标识数据为更新,分别记录其变更前的记录和变更后的记录。
  • _commit_version:变更对应的Delta表版本。
  • _commit_timestamp:变更对应的Delta表版本提交的时间。

使用示例

Spark SQL示例

重要 仅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL语法。
EMR Spark2上使用Spark SQL语法需要额外配置以下参数,代码如下所示。
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes
SQL语法如下所示。
-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);

-- Update
UPDATE cdf_tbl set age = age + 1;

-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;

-- CDF Query
-- 查询从版本0开始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34为commit0的提交时间戳。


-- 查询版本4对应的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06为commit4的提交时间戳。

两次查询返回信息如下所示。

图 1. 查询1结果
fig1
图 2. 查询2结果
fig2

Streaming SQL示例

EMR Spark支持Spark Streaming SQL,您可以使用SQL语法开发Streaming的任务。Delta Lake CDF也集成了Streaming SQL,可以更方便的实现增量更新的业务场景。

SQL语法如下所示。
-- 通过设置readChangeFeed='true'来创建基于Delta表的Streaming Source。
CREATE SCAN stream_cdf_source ON delta_table USING STERAM
OPTIONS(readChangeFeed='true', startingVersion='0');

-- 创建并执行Streaming任务。
CREATE STREAM job_name_x
OPTIONS (checkpointLocation="/tmp/delta/job_name_x")
INSERT INTO target_table SELECT * FROM stream_cdf_source;
创建SCAN时可以选择指定startingVersion,创建STREAM时需要指定checkpoint路径来保存流式的offset信息,两者都会作用于流式读取的起始点。
  • 不指定startingVersion:
    • checkpointLocation为空:返回当前Delta Lake的快照数据。
    • checkpointLocation有历史offset:以checkpoint的offset为起始,startingVersion不生效。
  • 指定startingVersion:
    • checkpointLocation为空:以startingVersion为起始。
    • checkpointLocation有历史offset:以checkpoint的offset为起始,startingVersion不生效。

DataFrame示例

// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
  .option("delta.enableChangeDataFeed", "true") //首次写入delta数据时开启CDF,后续写入无需设置。
  .saveAsTable("cdf_table")

// CDF Query Using DataFrame
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可选。
  .table("cdf_table")

Spark Streaming示例

// Streaming CDF Query Using Dats
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可选。
  .table("cdf_table")