通过Flink CDC实时订阅全量和增量数据(邀测中)

更新时间:

AnalyticDB for PostgreSQL提供自研的CDC连接器,基于PostgreSQL的逻辑复制功能实现订阅全量和增量数据,可与Flink无缝集成。该连接器能够高效捕获源表的实时变更数据,支持实时数据同步、流式处理等,助力企业快速响应动态数据需求。本文介绍如何通过阿里云实时计算FlinkCDC实时订阅AnalyticDB for PostgreSQL的全量和增量数据。

使用限制

  • 内核版本为7.2.1.4及以上的AnalyticDB for PostgreSQL7.0版实例。

    说明

    您可以在控制台实例的基本信息页查看内核小版本。如不满足上述版本要求,需要您升级内核小版本

  • 暂不支持AnalyticDB for PostgreSQLServerless模式。

前提条件

  • AnalyticDB for PostgreSQL实例和Flink全托管工作空间位于同一VPC。

  • 调整AnalyticDB for PostgreSQL实例参数配置

    • 开启逻辑解析,即设置wal_level参数的值为logical。

    • 若您的AnalyticDB for PostgreSQL实例系列为高可用版,为保障迁移任务的顺利进行,避免因主备切换而导致逻辑订阅中断,还需将hot_standbyhot_standby_feedbacksync_replication_slots参数的值均配置为on。

  • AnalyticDB for PostgreSQL账号是初始账号高权限用户RDS_SUPERUSER,且需要赋权REPLICATION,ALTER USER <username> WITH REPLICATION;

  • 已将Flink工作空间所属的网段添加至AnalyticDB for PostgreSQL白名单

  • 下载flink-sql-connector-adbpg-cdc-3.3.jar,并在Flink工作空间内上传CDC连接器

操作步骤

步骤一:准备测试表和测试数据

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台,找到目标实例,单击实例ID。

  2. 基本信息页面的右侧下,单击登录数据库

  3. 创建测试数据库和源表adbpg_source_table,并向源表写入50条数据。

    -- 创建测试数据库
    CREATE DATABASE testdb;
    -- 切换至testdb数据库并创建schema
    CREATE SCHEMA testschema;
    -- 创建源表adbpg_source_table
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- 向adbpg_source_table的表中写入50条数据
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. 创建结果表adbpg_sink_table,用于Flink写入结果数据。

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

步骤二:新建Flink作业

  1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击数据开发 > ETL

  3. 单击顶部菜单栏新建,选择空白的流作业草稿,单击下一步,在新建文件草稿对话框,填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    adbpg-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.7-flink-1.15

  4. 单击创建

步骤三:编写作业代码并部署

  1. 创建数据源datagen_source(用于生成模拟数据)和source_adbpg(用于从 AnalyticDB for PostgreSQL数据库中捕获实时的数据变更),将两个数据源关联,并将结果写入结果表sink_adbpg(将处理后的数据写入AnalyticDB for PostgreSQL)。

    将以下作业代码拷贝到作业文本编辑区。

    ---创建datagen源表,使用datagen连接器生成流式数据
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --创建adbpg源表,使用adbpg-cdc连接器,通过slot.name和逻pgoutput捕获adbpg_source_table的数据变更
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc', 
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    --创建adbpg结果表,将处理后的结果写入数据库的目标表adbpg_sink_table。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',  
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- datagen_source表和source_adbpg源表关联后的结果写入adbpg结果表
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    参数说明

    参数名称

    是否必填

    数据类型

    描述

    connector

    STRING

    连接类型。源表固定值为adbpg-cdc,结果表固定值为adbpg

    hostname

    STRING

    AnalyticDB for PostgreSQL实例的内网地址。您可在目标实例的基本信息页签下获取内网地址

    username

    STRING

    AnalyticDB for PostgreSQL数据库的账号和密码。

    password

    STRING

    database-name

    STRING

    数据库名称。

    schema-name

    STRING

    Schema名称。该参数支持正则表达式,可以一次订阅多个Schema。

    table-name

    STRING

    表名称。该参数支持正则表达式,可以一次订阅多个表。

    port

    INTEGER

    AnalyticDB for PostgreSQL端口,固定值为5432。

    decoding.plugin.name

    STRING

    Postgres Logical Decoding插件名称。固定值为pgoutput。

    slot.name

    STRING

    逻辑解码槽的名字。

    • 对于同一个Flink作业涉及的源表,建议使用相同的slot.name

    • 如果不同的Flink作业涉及同一张表,则建议为每个作业分别设置独立的slot.name参数,以避免出现以下错误:PSQLException: ERROR: replication slot "debezium" is active for PID 974

    debezium.*

    STRING

    该参数可以更细粒度地控制Debezium客户端的行为。例如,设置 'debezium.snapshot.mode' = 'never' 可以禁用快照功能。您可以通过配置属性获取更多配置详情。

    scan.incremental.snapshot.enabled

    BOOLEAN

    是否开启增量快照,取值如下。

    • false(默认值):不开启增量快照。

    • true:开启增量快照。

    scan.startup.mode

    STRING

    消费数据时的启动模式,取值如下。

    • initial(默认值):在首次启动时,先扫描历史全量数据,然后读取最新的WAL日志数据,实现全量与增量数据的无缝衔接。

    • latest-offset:在首次启动时,不扫描历史全量数据,直接从 WAL 日志的末尾(即最新的日志位置)开始读取,仅捕获连接器启动后的最新变更数据。

    • snapshot:先扫描历史全量数据,同时读取全量阶段新产生的 WAL 日志,最终作业会在完成全量扫描后停止运行。

    changelog-mode

    STRING

    用于编码流更改的变更日志(Changelog)模式,取值如下。

    • ALL(默认值):支持所有操作类型,包括 INSERTDELETEUPDATE_BEFORE 和 UPDATE_AFTER

    • UPSERT:仅支持 UPSERT 类型的操作,包括 INSERTDELETE 和 UPDATE_AFTER

    heartbeat.interval.ms

    DURATION

    发送心跳包的时间间隔,默认值为30秒(单位:毫秒)。

    AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。

    scan.incremental.snapshot.chunk.key-column

    STRING

    指定某一列作为快照阶段分片的切分列,默认情况下会从主键中选择第一列。

    url

    STRING

    格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

  2. 在作业开发页面顶部,单击深度检查,进行语法检查。

  3. 单击部署,单击确定

  4. 单击右上角前往运维,在作业运维页面,单击启动

步骤四:查看Flink写入数据

  1. 在测试数据库中执行如下语句,查看Flink写入的数据。

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table; 
  2. 在源表中新增50条数据,并在结果表中查看Flink写入的增量数据的总数。

    -- 源表写入50条增量数据
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- 检查目标表新增数据
    SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;

    结果如下。

     count 
    -------
        50
    (1 row)

注意事项

  • 请及时管理Replication Slot,以避免磁盘空间的浪费。

    为了防止Flink作业重启过程中因Checkpoint对应的WAL(Write-Ahead Log)被清理而导致数据丢失,Flink不会自动删除Replication Slot。因此,如果确认某个Flink作业不再需要重启,请手动删除其对应的Replication Slot,以释放其占用的资源。此外,如果Replication Slot的确认位点长时间未推进,AnalyticDB for PostgreSQL将无法清理该位点之后的WAL条目,可能会导致未使用的WAL数据持续积累,占用大量磁盘空间。

  • AnalyticDB for PostgreSQL实例正常运行期间,能保证Exactly Once数据处理语义。但在故障场景下,仅能提供至少一次(At-Least Once)的语义保障。

  • 订阅表的REPLICA IDENTITY参数会被CDC连接器修改为FULL,以保障该表数据同步的一致性,影响如下:

    • 磁盘空间占用增加。在频繁更新或删除操作的场景下,该设置将增加WAL日志的大小,导致磁盘空间占用增加。

    • 写操作性能下降。在高并发写入场景下,性能可能会受到明显影响。

    • 检查点压力增大。更大的WAL日志意味着检查点(Checkpoint)需要处理更多的数据,可能导致检查点时间延长。

最佳实践

Flink CDC支持通过Flink SQL APIDataStream API进行作业开发,可实现对源库单表或多表的全量与增量一体化的数据同步;此外,它还支持对异构数据源进行双流Join等计算。在整个数据处理过程中,Flink框架能够确保精确一次(Exactly-once)的事件处理语义。然而,它不适合对PostgreSQL系数据库进行全库同步,一方面是因为它不支持DDL的同步,另一方面需要在Flink SQL中定义每张表的结构,后期维护相对复杂。

本章节将从AnalyticDB for PostgreSQL同步数据到Kafka为例,介绍Flink CDC SQL作业开发的最佳实践。在开发Flink CDC作业前,请确保已完成前提条件中的资源准备与配置。

步骤一:准备测试表

AnalyticDB for PostgreSQL实例中创建两张源表。

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    sku CHAR(12) NOT NULL,
    description TEXT,
    price NUMERIC(10,2) NOT NULL,
    discount_price DECIMAL(10,2),
    stock_quantity INTEGER DEFAULT 0,
    weight REAL,
    volume DOUBLE PRECISION,
    dimensions BOX,
    release_date DATE,
    is_featured BOOLEAN DEFAULT FALSE,
    rating FLOAT,
    warranty_period INTERVAL,
    metadata JSON,
    tags TEXT[]
);

CREATE TABLE documents (
    document_id UUID PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    content TEXT,
    summary TEXT,
    publication_date TIMESTAMP WITHOUT TIME ZONE,
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    author_id BIGINT,
    file_data BYTEA,
    xml_content XML,
    json_metadata JSON,
    reading_time INTERVAL,
    is_public BOOLEAN DEFAULT TRUE,
    views_count INTEGER DEFAULT 0,
    category VARCHAR(50),
    tags TEXT[]
);

步骤二:准备Kafka资源

  1. 购买和部署Kafka实例

  2. Flink工作空间所属的网段添加至Kafka的白名单中。

  3. Kafka实例中创建资源

步骤三:新建Flink作业

  1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击数据开发 > ETL

  3. 单击顶部菜单栏新建,选择空白的流作业草稿,单击下一步,在新建文件草稿对话框,填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    adbpg-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.7-flink-1.15

  4. 单击创建

步骤四:编写作业代码并部署

  1. Flink工作空间内编写SQL作业。将以下作业代码拷贝到作业文本编辑区,并将相关配置替换为实际值。

    -- 使用一个source实现多表采集
    CREATE TEMPORARY TABLE ADBPGSource(
        table_name STRING METADATA FROM 'table_name' VIRTUAL,
        row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING
    ) WITH (
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'public',
      'table-name' = '(products|documents)',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput',
      'debezium.snapshot.mode' = 'never'
    );
    
    CREATE TEMPORARY TABLE KafkaProducts (
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        PRIMARY KEY(product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    CREATE TEMPORARY TABLE KafkaDocuments (
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING,
        tags STRING,
        PRIMARY KEY(document_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    -- 使用 STATEMENT SET 包装多条语句
    BEGIN STATEMENT SET;
    -- 使用元数据 table_name 路由到目标表
    INSERT INTO KafkaProducts
    SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags
    FROM ADBPGSource
    WHERE table_name = 'products';
    
    INSERT INTO KafkaDocuments
    SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags
    FROM ADBPGSource
    WHERE table_name = 'documents';
    
    END;

    对于此SQL作业需要关注以下几点:

    • 对于多表同步的任务,建议参考此SQL使用一个source表实现多表采集,需要在此source表中定义全部源表的列(重名列保留一个);向目标表写入时通过METADATAtable_name 来路由到指定表。这种做法的优势在于,仅需要在AnalyticDB for PostgreSQL中创建一个Replication Slot,能降低对源库资源的占用,提升同步性能,便于后期维护。

    • 使用table-name参数来指定多张源表,表名放在括号中,使用|符号分割, 例如(table1|table2|table3)

    • debezium.snapshot.mode配置为never表示只同步源表增量数据;若需要同步全量及增量数据,修改配置为initial

  2. 在作业开发页面顶部,单击深度检查,进行语法检查。

  3. 单击部署,单击确定

  4. 单击右上角前往运维,在作业运维页面,单击启动

步骤五:插入测试数据

AnalyticDB for PostgreSQL实例中分别对两张源表进行数据更新,并观察Kafka Topic中的消息变化

您可以参考以下SQL插入测试数据:

INSERT INTO products (
    product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
    '测试商品', 'Test-2025', '一条测试商品数据', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"测试1", "测试2"}'
);

相关文档

Flink实时读写全量数据,订阅AnalyticDB for PostgreSQL的全量数据。