CDC(Change Data Capture)定义了识别并捕获数据库表中数据的变更场景,用于记录Delta Table增量表行级别的插入、更新和删除等操作,从而可以有效捕捉该表的数据变化事件流,后续可以通过事件驱动辅助增量计算、数据同步、数仓分层等业务需求。
功能介绍
增量计算:可增量读取CDC变更记录进行增量计算,如增量物化视图更新等。
流式计算:可流式读取CDC变更记录进行流式计算,如Flink计算任务消费。
多引擎间的数据同步:不同引擎间的增量数据同步和计算。
日志审计:详细记录所有的数据操作记录,用于操作日志审计。
该功能目前处于邀测中,邀测功能使用方法请参见使用说明。
建表DDL
建表DDL分为两种:同步CDC和异步CDC。
同步CDC:仅支持SQL DML操作生成增量表,不支持实时数据写入。SQL操作完成,CDC数据即生成。
异步CDC:支持SQL DML操作生成增量表,支持实时数据写入。但CDC数据生成属于异步行为,非立即生效。
同步CDC
创建Delta Table时添加"acid.cdc.mode.enable"="true"
属性, 可以选择性添加"cdc.insert.into.passthrough.enable"="true"
和"cdc.data.retain.hours"="24"
属性。
SQL示例如下:
CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");
acid.cdc.mode.enable
:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel实时写入。cdc.insert.into.passthrough.enable
:对于开启CDC功能的表默认不支持INSERT INTO,添加该属性时,可支持执行INSERT INTO语句,写入的数据在CDC中仅代表INSERT类型,如果存在相同PK的数据行,后续查询会因为PK冲突导致查询失败,需要用户确保PK数据唯一。cdc.data.retain.hours
:CDC数据的保留时间(1~168小时),默认24小时。
异步CDC
创建Delta Table时添加"acid.cdc.mode.enable"="true"
、"acid.cdc.build.async"="true"
、"acid.cdc.build.interval"="300"
属性。
SQL示例如下:
CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300");
acid.cdc.mode.enable
:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel实时写入。acid.cdc.build.async
:开启异步构建CDC的功能,支持Tunnel实时写入该表,同时对于SQL的DML也会异步生成。acid.cdc.build.interval
:异步构建的周期配置[60-3540],单位秒,结合业务或增量场景配置该参数。其他可选参数(Project级 / Session级):
"odps.storage.orc.enable.memcmp.sort.key="true"
,建议Project级别开启,对于CDC的异步构建以及查询都会有性能帮助。
CDC查询
table_changes
语法
SELECT * FROM table_changes('<table_name>', <start> [, <end>]);
参数说明
参数名称
是否必填
参数说明
参数名称
是否必填
参数说明
table_name
是
指定查询的Delta Table。
start
是
BIGINT或者TIMESTAMP类型,常量。指定对该表CDC数据查询的起始version,table version可以通过
SHOW HISTORY FOR TABLE <table_name>;
命令查询,详见SHOW。end
否
BIGINT或者TIMESTAMP类型,常量。指定对该表CDC数据查询的截止version,若不填,则会查询到最新version。
返回值说明
除数据列外,会额外输出如下三个系统列:
__meta_timestamp
:代表数据写入系统时间。__meta_op_type
:包含1(INSERT)和0(DELETE)。__meta_is_update
:包含1(TRUE)和0(FALSE)。
__meta_op_type
和__meta_is_update
可组合成如下四种情况:__meta_op_type
__meta_is_update
说明
1
0
代表INSERT的新记录。
1
1
代表UPDATE后的值。
0
1
代表UPDATE前的值。
0
0
代表删除。
示例
创建
acid_cdc_table
表 。CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2)) tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");
表中插入数据。
-- 插入数据时间2025-04-07 11:56:57 INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006); -- 插入数据时间2025-04-07 12:15:00 INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008); -- 插入数据时间2025-04-07 13:24:00 INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032); -- 插入数据时间2025-04-07 14:00:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045); -- 插入数据时间2025-04-07 14:47:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
查询
table version
。SHOW HISTORY FOR TABLE acid_cdc_table;
返回结果如下。
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000001 2025-04-07 11:55:59 CREATE TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000002 2025-04-07 11:56:57 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000003 2025-04-07 12:00:13 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000004 2025-04-07 12:15:32 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000005 2025-04-07 12:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000006 2025-04-07 13:24:47 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000007 2025-04-07 13:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000008 2025-04-07 14:00:41 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000009 2025-04-07 14:15:15 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000010 2025-04-07 14:47:46 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000011 2025-04-07 15:00:11 MINOR_COMPACT
查询CDC记录。
查询2025-04-07 12:00:00以来记录。
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00'); -- 等同于 SELECT * FROM table_changes('acid_cdc_table', 3);
返回结果如下。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1045 | 1045 | 1045 | 2025-04-07 14:00:34 | 1 | 0 | | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | | 2 | 1045 | 1045 | 1045 | 2025-04-07 14:47:41 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+
查询指定区间
2025-04-07 12:00:00
至13:30:00
的记录。SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00'); -- 等同于 SELECT * FROM table_changes('acid_cdc_table', 3, 6);
返回结果如下。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+
Stream
指定对Delta Table CDC数据配合Stream使用,详细使用方式请参见流对象(Stream),此处仅提供基本语法和示例。
语法
CREATE STREAM [IF NOT EXISTS] <stream_name> ON TABLE <delta_table_name> <VERSION as of v> strmproperties ("read_mode"="cdc")
"read_mode"
指定对Delta Table的消费模式,"cdc"
表示该Stream会根据查询范围查询CDC数据。示例
-- 创建源表acid_with_cdc_stream CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true"); -- 插入数据 INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032); -- 创建一个Stream关联acid_with_cdc_stream表 CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc"); -- 查询Stream对象delta_table_stream DESC STREAM delta_table_stream;
返回结果示例如下:
Name delta_table_stream Project yunqi_y**** Schema default Create Time 2024-12-03 11:13:12 Last Modified Time 2024-12-03 11:13:12 Offset Version 1 Reference Table Project yunqi_y**** Reference Table Schema default Reference Table Name acid_with_cdc_stream Reference Table Id b89ec113f50944d5b8e52ce6a00c**** Reference Table Version 2 Parameters {"read_mode": "cdc"}
- 本页导读 (1)
- 功能介绍
- 建表DDL
- 同步CDC
- 异步CDC
- CDC查询
- table_changes
- Stream