CDC(邀测)

更新时间:2025-04-09 07:02:55

CDC(Change Data Capture)定义了识别并捕获数据库表中数据的变更场景,用于记录Delta Table增量表行级别的插入、更新和删除等操作,从而可以有效捕捉该表的数据变化事件流,后续可以通过事件驱动辅助增量计算、数据同步、数仓分层等业务需求。

功能介绍

  • 增量计算:可增量读取CDC变更记录进行增量计算,如增量物化视图更新等。

  • 流式计算:可流式读取CDC变更记录进行流式计算,如Flink计算任务消费。

  • 多引擎间的数据同步:不同引擎间的增量数据同步和计算。

  • 日志审计:详细记录所有的数据操作记录,用于操作日志审计。

该功能目前处于邀测中,邀测功能使用方法请参见使用说明

建表DDL

建表DDL分为两种:同步CDC和异步CDC。

  • 同步CDC:仅支持SQL DML操作生成增量表,不支持实时数据写入。SQL操作完成,CDC数据即生成。

  • 异步CDC:支持SQL DML操作生成增量表,支持实时数据写入。但CDC数据生成属于异步行为,非立即生效。

同步CDC

创建Delta Table时添加"acid.cdc.mode.enable"="true"属性, 可以选择性添加"cdc.insert.into.passthrough.enable"="true""cdc.data.retain.hours"="24"属性。

SQL示例如下:

CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT) 
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");
说明
  • acid.cdc.mode.enable:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel实时写入。

  • cdc.insert.into.passthrough.enable:对于开启CDC功能的表默认不支持INSERT INTO,添加该属性时,可支持执行INSERT INTO语句,写入的数据在CDC中仅代表INSERT类型,如果存在相同PK的数据行,后续查询会因为PK冲突导致查询失败,需要用户确保PK数据唯一。

  • cdc.data.retain.hours:CDC数据的保留时间(1~168小时),默认24小时。

异步CDC

创建Delta Table时添加"acid.cdc.mode.enable"="true""acid.cdc.build.async"="true""acid.cdc.build.interval"="300"属性。

SQL示例如下:

CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT) 
tblproperties ("transactional" = "true", 
               "acid.cdc.mode.enable"="true",
               "acid.cdc.build.async"="true",
               "acid.cdc.build.interval"="300");                                                                           
说明
  • acid.cdc.mode.enable:开启Delta Table CDC功能,默认为同步方式,支持在SQL DML时同步生成CDC数据,不支持Tunnel实时写入。

  • acid.cdc.build.async:开启异步构建CDC的功能,支持Tunnel实时写入该表,同时对于SQLDML也会异步生成。

  • acid.cdc.build.interval:异步构建的周期配置[60-3540],单位秒,结合业务或增量场景配置该参数。

  • 其他可选参数(Project级 / Session级):

    "odps.storage.orc.enable.memcmp.sort.key="true",建议Project级别开启,对于CDC的异步构建以及查询都会有性能帮助。

CDC查询

table_changes

  • 语法

    SELECT * FROM table_changes('<table_name>', <start> [, <end>]);
  • 参数说明

    参数名称

    是否必填

    参数说明

    参数名称

    是否必填

    参数说明

    table_name

    指定查询的Delta Table。

    start

    BIGINT或者TIMESTAMP类型,常量。指定对该表CDC数据查询的起始version,table version可以通过SHOW HISTORY FOR TABLE <table_name>;命令查询,详见SHOW

    end

    BIGINT或者TIMESTAMP类型,常量。指定对该表CDC数据查询的截止version,若不填,则会查询到最新version。

  • 返回值说明

    除数据列外,会额外输出如下三个系统列:

    • __meta_timestamp:代表数据写入系统时间。

    • __meta_op_type:包含1(INSERT)和0(DELETE)。

    • __meta_is_update:包含1(TRUE)和0(FALSE)。

    __meta_op_type__meta_is_update可组合成如下四种情况:

    __meta_op_type

    __meta_is_update

    说明

    1

    0

    代表INSERT的新记录。

    1

    1

    代表UPDATE后的值。

    0

    1

    代表UPDATE前的值。

    0

    0

    代表删除。

  • 示例

    1. 创建acid_cdc_table表 。

      CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2))
      tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");
    2. 表中插入数据。

      -- 插入数据时间2025-04-07 11:56:57
      INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006);
      -- 插入数据时间2025-04-07 12:15:00
      INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008);
      -- 插入数据时间2025-04-07 13:24:00
      INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032);
      -- 插入数据时间2025-04-07 14:00:00
      INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
      -- 插入数据时间2025-04-07 14:47:00
      INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
    3. 查询table version

      SHOW HISTORY FOR TABLE acid_cdc_table;

      返回结果如下。

      ObjectType      ObjectId                                ObjectName              VERSION(LSN)            Time                    Operation       
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000001        2025-04-07 11:55:59     CREATE          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000002        2025-04-07 11:56:57     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000003        2025-04-07 12:00:13     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000004        2025-04-07 12:15:32     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000005        2025-04-07 12:30:02     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000006        2025-04-07 13:24:47     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000007        2025-04-07 13:30:02     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000008        2025-04-07 14:00:41     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000009        2025-04-07 14:15:15     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000010        2025-04-07 14:47:46     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000011        2025-04-07 15:00:11     MINOR_COMPACT  
    4. 查询CDC记录。

      • 查询2025-04-07 12:00:00以来记录。

        SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00');
        -- 等同于 
        SELECT * FROM table_changes('acid_cdc_table', 3);

        返回结果如下。

        +------------+------------+------------+------------+------------------+----------------+------------------+
        | id1        | id2        | key1       | key2       | __meta_timestamp | __meta_op_type | __meta_is_update |
        +------------+------------+------------+------------+------------------+----------------+------------------+
        | 1          | 1045       | 1045       | 1045       | 2025-04-07 14:00:34 | 1              | 0                |
        | 1          | 1008       | 1008       | 1008       | 2025-04-07 12:15:28 | 1              | 0                |
        | 1          | 1032       | 1032       | 1032       | 2025-04-07 13:24:43 | 1              | 0                |
        | 2          | 1045       | 1045       | 1045       | 2025-04-07 14:47:41 | 1              | 0                |
        +------------+------------+------------+------------+------------------+----------------+------------------+
      • 查询指定区间2025-04-07 12:00:0013:30:00的记录。

        SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00');
        -- 等同于 
        SELECT * FROM table_changes('acid_cdc_table', 3, 6);

        返回结果如下。

        +------------+------------+------------+------------+------------------+----------------+------------------+
        | id1        | id2        | key1       | key2       | __meta_timestamp | __meta_op_type | __meta_is_update |
        +------------+------------+------------+------------+------------------+----------------+------------------+
        | 1          | 1008       | 1008       | 1008       | 2025-04-07 12:15:28 | 1              | 0                |
        | 1          | 1032       | 1032       | 1032       | 2025-04-07 13:24:43 | 1              | 0                |
        +------------+------------+------------+------------+------------------+----------------+------------------+

Stream

指定对Delta Table CDC数据配合Stream使用,详细使用方式请参见流对象(Stream),此处仅提供基本语法和示例。

  • 语法

    CREATE STREAM [IF NOT EXISTS] <stream_name> 
    ON TABLE <delta_table_name> <VERSION as of v>
    strmproperties ("read_mode"="cdc") 
    说明

    "read_mode"指定对Delta Table的消费模式,"cdc"表示该Stream会根据查询范围查询CDC数据。

  • 示例

    -- 创建源表acid_with_cdc_stream
    CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) 
    tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true");
    
    -- 插入数据
    INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032);
    
    -- 创建一个Stream关联acid_with_cdc_stream表
    CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc");
    
    -- 查询Stream对象delta_table_stream
    DESC STREAM delta_table_stream;

    返回结果示例如下:

    Name                                    delta_table_stream                      
    Project                                 yunqi_y****                             
    Schema                                  default                                 
    Create Time                             2024-12-03 11:13:12                     Last Modified Time                      2024-12-03 11:13:12                     
    Offset Version                          1                                       
    Reference Table Project                 yunqi_y****                             
    Reference Table Schema                  default                                 
    Reference Table Name                    acid_with_cdc_stream                       
    Reference Table Id                      b89ec113f50944d5b8e52ce6a00c****        
    Reference Table Version                 2                                       
    Parameters                              {"read_mode": "cdc"}

  • 本页导读 (1)
  • 功能介绍
  • 建表DDL
  • 同步CDC
  • 异步CDC
  • CDC查询
  • table_changes
  • Stream
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等