通过Flink CDC实时订阅全量和增量数据(邀测中)

更新时间:

AnalyticDB for PostgreSQL提供自研的CDC连接器,基于PostgreSQL的逻辑复制功能实现订阅全量和增量数据,可与Flink无缝集成。该连接器能够高效捕获源表的实时变更数据,支持实时数据同步、流式处理等,助力企业快速响应动态数据需求。本文介绍如何通过阿里云实时计算FlinkCDC实时订阅AnalyticDB for PostgreSQL的全量和增量数据。

使用限制

  • 内核版本为7.1.1.4及以上的AnalyticDB for PostgreSQL7.0版实例。

    说明

    您可以在控制台实例的基本信息页查看内核小版本。如不满足上述版本要求,需要您升级内核小版本

  • 暂不支持AnalyticDB for PostgreSQLServerless模式。

  • 支持订阅Heap表和AO列存表的全量和增量数据。如果不是Heap表或AO列存表,您可通过Flink实时订阅全量数据订阅表的全量数据。

    查看是否和修改Heap表或AO列存表

    您可以通过以下SQL查询表是否为Heap表或AO列存表。

    • 若是Heap表,查询结果将返回heap

    • 若是AO列存表,查询结果将返回ao_column

    SELECT amname FROM pg_class, pg_am WHERE pg_class.oid = '<schema_name>.<table_name>'::regclass AND pg_am.oid = relam ;

    您也可以变更目标表为Heap表或AO列存表。

    ALTER TABLE <table_name> SET ACCESS METHOD heap;
    ALTER TABLE <table_name> SET ACCESS METHOD ao_column;

前提条件

  • AnalyticDB for PostgreSQL实例和Flink全托管工作空间位于同一VPC。

  • 提交工单AnalyticDB for PostgreSQL实例开通CDC功能并获取CDC连接器jar包。

  • AnalyticDB for PostgreSQL账号是初始账号高权限用户RDS_SUPERUSER

  • 已将Flink工作空间所属的网段添加至AnalyticDB for PostgreSQL白名单

  • 已设置订阅表的REPLICA IDENTITY参数为FULL,保障该表数据同步的一致性。

    查看和修改REPLICA IDENTITY参数

    REPLICA IDENTITYPostgreSQL中用于逻辑复制的参数,控制DELETEUPDATE时如何记录行的旧值和新值。

    您可使用以下SQL查询该参数的取值详情。

    SELECT relname, relreplident FROM pg_class WHERE relkind = 'r' 
    AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '<schema_name>');

    您可以使用以下SQL修改该参数的取值为FULL。该设置影响如下:

    • 磁盘空间占用。尤其是在频繁更新或删除操作的场景下,该设置将增加WAL日志的大小,导致磁盘空间占用增加。

    • 写操作性能下降。在高并发写入场景下,性能可能会受到明显影响。

    • 检查点压力增大。更大的WAL日志意味着检查点(Checkpoint)需要处理更多的数据,可能导致检查点时间延长。

    ALTER TABLE <schema_name>.<table_name> REPLICA IDENTITY FULL; --修改该参数为full

操作步骤

步骤一:准备测试表和测试数据

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台,找到目标实例,单击实例ID。

  2. 基本信息页面的右侧下,单击登录数据库

  3. 创建测试数据库和源表adbpg_source_table,并向源表写入50条数据。

    -- 创建测试数据库
    CREATE DATABASE testdb;
    -- 切换至testdb数据库并创建schema
    CREATE SCHEMA testschema;
    -- 创建源表adbpg_source_table
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- 向adbpg_source_table的表中写入50条数据
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. 创建结果表adbpg_sink_table,用于Flink写入结果数据。

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

步骤二:创建CDC连接器和Flink作业

  1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击连接器创建并上传CDC连接器

  3. 在左侧导航栏,单击数据开发 > ETL

  4. 单击顶部菜单栏新建,选择空白的流作业草稿,单击下一步,在新建文件草稿对话框,填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    adbpg-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.7-flink-1.15

  5. 单击创建

步骤三:编写作业代码并部署

  1. 创建数据源datagen_source(用于生成模拟数据)和source_adbpg(用于从 AnalyticDB for PostgreSQL数据库中捕获实时的数据变更),将两个数据源关联,并将结果写入结果表sink_adbpg(将处理后的数据写入AnalyticDB for PostgreSQL)。

    将以下作业代码拷贝到作业文本编辑区。

    ---创建datagen源表,使用datagen连接器生成流式数据
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --创建adbpg源表,使用adbpg-cdc连接器,通过slot.name和逻pgoutput捕获adbpg_source_table的数据变更
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc', 
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    --创建adbpg结果表,将处理后的结果写入数据库的目标表adbpg_sink_table。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',  
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- datagen_source表和source_adbpg源表关联后的结果写入adbpg结果表
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    参数说明

    参数名称

    是否必填

    数据类型

    描述

    connector

    String

    连接类型。源表固定值为adbpg-cdc,结果表固定值为adbpg

    hostname

    STRING

    AnalyticDB for PostgreSQL实例的内网地址。您可在目标实例的基本信息页签下获取内网地址

    userName

    String

    AnalyticDB for PostgreSQL数据库的账号和密码。

    password

    STRING

    database-name

    STRING

    数据库名称。

    schema-name

    STRING

    Schema名称。该参数支持正则表达式,可以一次订阅多个Schema。

    table-name

    STRING

    表名称。该参数支持正则表达式,可以一次订阅多个表。

    port

    INTEGER

    AnalyticDB for PostgreSQL端口,固定值为5432。

    decoding.plugin.name

    STRING

    Postgres Logical Decoding插件名称。固定值为pgoutput。

    slot.name

    STRING

    逻辑解码槽的名字。

    • 对于同一个Flink作业涉及的源表,建议使用相同的slot.name

    • 如果不同的Flink作业涉及同一张表,则建议为每个作业分别设置独立的slot.name参数,以避免出现以下错误:PSQLException: ERROR: replication slot "debezium" is active for PID 974

    debezium.*

    STRING

    该参数可以更细粒度地控制Debezium客户端的行为。例如,设置 'debezium.snapshot.mode' = 'never' 可以禁用快照功能。您可以通过配置属性获取更多配置详情。

    scan.incremental.snapshot.enabled

    BOOLEAN

    是否开启增量快照,取值如下。

    • false(默认值):不开启增量快照。

    • true:开启增量快照。

    scan.startup.mode

    STRING

    消费数据时的启动模式,取值如下。

    • initial(默认值):在首次启动时,先扫描历史全量数据,然后读取最新的WAL日志数据,实现全量与增量数据的无缝衔接。

    • latest-offset:在首次启动时,不扫描历史全量数据,直接从 WAL 日志的末尾(即最新的日志位置)开始读取,仅捕获连接器启动后的最新变更数据。

    • snapshot:先扫描历史全量数据,同时读取全量阶段新产生的 WAL 日志,最终作业会在完成全量扫描后停止运行。

    changelog-mode

    STRING

    用于编码流更改的变更日志(Changelog)模式,取值如下。

    • ALL(默认值):支持所有操作类型,包括 INSERTDELETEUPDATE_BEFORE 和 UPDATE_AFTER

    • UPSERT:仅支持 UPSERT 类型的操作,包括 INSERTDELETE 和 UPDATE_AFTER

    heartbeat.interval.ms

    DURATION

    发送心跳包的时间间隔,默认值为30秒(单位:毫秒)。

    AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。

    scan.incremental.snapshot.chunk.key-column

    STRING

    指定某一列作为快照阶段分片的切分列,默认情况下会从主键中选择第一列。

    url

    String

    格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

  2. 在作业开发页面顶部,单击深度检查,进行语法检查。

  3. 单击部署,单击确定

  4. 单击右上角前往运维,在作业运维页面,单击启动

步骤四:查看Flink写入数据

  1. 在测试数据库中执行如下语句,查看Flink写入的数据。

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table; 
  2. 在源表中新增50条数据,并在结果表中查看Flink写入的增量数据的总数。

    -- 源表写入50条增量数据
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- 检查目标表新增数据
    SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;

    结果如下。

     count 
    -------
        50
    (1 row)

注意事项

  • 请及时管理Replication Slot,以避免磁盘空间的浪费。

    为了防止Flink作业重启过程中因Checkpoint对应的WAL(Write-Ahead Log)被清理而导致数据丢失,Flink不会自动删除Replication Slot。因此,如果确认某个Flink作业不再需要重启,请手动删除其对应的Replication Slot,以释放其占用的资源。此外,如果Replication Slot的确认位点长时间未推进,AnalyticDB for PostgreSQL将无法清理该位点之后的WAL条目,可能会导致未使用的WAL数据持续积累,占用大量磁盘空间。

  • AnalyticDB for PostgreSQL实例正常运行期间,能保证Exactly Once数据处理语义。但在故障场景下,仅能提供至少一次(At-Least Once)的语义保障。

相关文档

Flink实时订阅全量数据,订阅AnalyticDB for PostgreSQL的全量数据。