CDC(Change Data Capture)定义了识别并捕获数据库表中数据的变更场景,用于记录Delta Table增量表行级别的插入、更新和删除等操作,从而可以有效捕捉该表的数据变化事件流,后续可以通过事件驱动辅助增量计算、数据同步、数仓分层等业务需求。

功能介绍

  • 增量计算:可增量读取CDC变更记录进行增量计算,如增量物化视图更新等。

  • 流式计算:可流式读取CDC变更记录进行流式计算,如Flink计算任务消费。

  • 多引擎间的数据同步:不同引擎间的增量数据同步和计算。

  • 日志审计: 详细记录所有的数据操作记录,用于操作日志审计。

基本操作

建表DDL

建表DDL分为两种:同步CDC和异步CDC。

  • 同步CDC:只支持SQL DML操作生成增量表,不支持实时数据写入。SQL操作完成,CDC数据即生成。

  • 异步CDC:支持SQL DML操作生成增量表,支持实时数据写入。但CDC数据生成属于异步行为,非立即生效。

同步CDC

创建Delta Table时添加"acid.cdc.mode.enable"="true"属性, 可以选择性添加"cdc.insert.into.passthrough.enable"="true""cdc.data.retain.hours"="24"属性。

SQL示例如下:

CREATE TABLE acid_with_cdc_tbl (pk bigint NOT NULL PRIMARY KEY, val bigint) 
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");
说明
  • acid.cdc.mode.enable:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel实时写入。

  • cdc.insert.into.passthrough.enable:对于开启CDC功能的表默认不支持INSERT INTO,当添加这个属性,可支持执行INSERT INTO语句,写入的数据在CDC中仅代表INSERT类型,如果存在相同PK的数据行,后面查询会因为PK冲突导致查询失败,需要用户数据保障PK数据唯一。

  • cdc.data.retain.hours:CDC数据的保留时间(1-168小时),默认24小时。

异步CDC

创建Delta Table时添加"acid.cdc.mode.enable"="true""acid.cdc.build.async"="true""acid.cdc.build.interval"="300"属性。

SQL示例如下:

CREATE TABLE acid_with_cdc_build_tbl (pk bigint NOT NULL PRIMARY KEY, val bigint) 
tblproperties ("transactional" = "true", 
               "acid.cdc.mode.enable"="true",
               "acid.cdc.build.async"="true",
               "acid.cdc.build.interval"="300");                                                                           
说明
  • acid.cdc.mode.enable:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel 实时写入。

  • acid.cdc.build.async:开启异步构建CDC的功能,支持Tunnel实时写入该表,同时对于SQL的DML也会异步生成。

  • acid.cdc.build.interval:异步构建的周期配置[60-3540],单位秒,结合业务或增量场景配置该参数。

  • 其他可选参数(Project级 / Session级):

    "odps.storage.orc.enable.memcmp.sort.key="true",建议Project级别开启,对于CDC的异步构建以及查询都会有性能帮助。

CDC查询

CDC查询分两种:table_changes和Stream。

table_changes

语法:

select * from table_changes('table_name', version_start, [version_end]);

参数说明:

  • table_name:必填,指定查询的Delta Table。

  • version_start:必填,是对该表CDC数据查询的起始version,table version可以通过show history for table table_name查询,详见SHOW

  • version_end:选填,指定对该表CDC数据查询的截止version,不填的话,会查询到最新version。

  • 结果说明:

    除数据列外,会额外输出三个系统列:

    __meta_timestamp:代表数据写入系统时间。

    __meta_op_type:包含INSERT和DELETE。

    __meta_is_update:包含true和false。

    __meta_op_type__meta_is_update可组合成如下四种情况:

    __meta_op_type

    __meta_is_update

    说明

    INSERT

    FALSE

    代表Insert的新记录。

    INSERT

    TRUE

    代表Update后的值。

    DELETE

    TRUE

    代表Update前的值。

    DELETE

    FALSE

    代表删除。

示例:

select *  from table_changes('acid_with_cdc_build_tbl', 1);
// NOTE:__operation(storage operation type) vs table_changes
//    +--------------------+--------------+
//    |__meta_op_type|__meta_is_update |
//    +--------------------+--------------+
//    |    INSERT(1) |      FALSE(0)   |
//    |    DELETE(0) |      FALSE(0)   |
//    |    DELETE(0) |      TRUE(1)    |
//    |    INSERT(1) |      TRUE(1)    |
//    +--------------------+--------------+

--如查询结果
+------------+------------+------------+------------+------------------+----------------+------------------+
| id1        | id2        | key1       | key2       | __meta_timestamp | __meta_op_type | __meta_is_update |
+------------+------------+------------+------------+------------------+----------------+------------------+
| 1          | 1006       | 1006       | 1006       | 2024-09-07 16:48:17 | 1              | 0             |
| 1          | 1008       | 1008       | 1008       | 2024-09-07 16:48:17 | 1              | 0             |
| 1          | 1032       | 1032       | 1032       | 2024-09-07 16:48:17 | 1              | 0             |
...

Stream

指定对Delta Table CDC数据配合Stream使用,详见流对象(Stream),这里只给出基本使用方式和示例。

create stream stream_cdc_tbl on table acid_with_cdc_tbl stmproperties ("read_mode"="cdc");
说明
  • "read_mode"指定对Delta Table的消费模式,"cdc"该Stream会根据查询的范围查询CDC数据。

  • Offset Version(DESC STREAM可查)查询时采用左开右闭区间。

  • 返回结果同table_changes。