通过Flink CDC实时订阅全量和增量数据(邀测中)
AnalyticDB for PostgreSQL提供自研的CDC连接器,基于PostgreSQL的逻辑复制功能实现订阅全量和增量数据,可与Flink无缝集成。该连接器能够高效捕获源表的实时变更数据,支持实时数据同步、流式处理等,助力企业快速响应动态数据需求。本文介绍如何通过阿里云实时计算Flink版CDC实时订阅AnalyticDB for PostgreSQL的全量和增量数据。
使用限制
内核版本为7.2.1.4及以上的AnalyticDB for PostgreSQL7.0版实例。
暂不支持AnalyticDB for PostgreSQL的Serverless模式。
前提条件
AnalyticDB for PostgreSQL实例和Flink全托管工作空间位于同一VPC。
调整AnalyticDB for PostgreSQL实例参数配置:
开启逻辑解析,即设置
wal_level
参数的值为logical。若您的AnalyticDB for PostgreSQL实例系列为高可用版,为保障迁移任务的顺利进行,避免因主备切换而导致逻辑订阅中断,还需将
hot_standby
、hot_standby_feedback
和sync_replication_slots
参数的值均配置为on。
AnalyticDB for PostgreSQL账号是初始账号或高权限用户RDS_SUPERUSER,且需要赋权REPLICATION,
ALTER USER <username> WITH REPLICATION;
。已将Flink工作空间所属的网段添加至AnalyticDB for PostgreSQL的白名单。
下载flink-sql-connector-adbpg-cdc-3.3.jar,并在Flink工作空间内上传CDC连接器。
操作步骤
步骤一:准备测试表和测试数据
登录云原生数据仓库AnalyticDB PostgreSQL版控制台,找到目标实例,单击实例ID。
在基本信息页面的右侧下,单击登录数据库。
创建测试数据库和源表adbpg_source_table,并向源表写入50条数据。
-- 创建测试数据库 CREATE DATABASE testdb; -- 切换至testdb数据库并创建schema CREATE SCHEMA testschema; -- 创建源表adbpg_source_table CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- 向adbpg_source_table的表中写入50条数据 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
创建结果表adbpg_sink_table,用于Flink写入结果数据。
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
步骤二:新建Flink作业
步骤三:编写作业代码并部署
创建数据源datagen_source(用于生成模拟数据)和source_adbpg(用于从 AnalyticDB for PostgreSQL数据库中捕获实时的数据变更),将两个数据源关联,并将结果写入结果表sink_adbpg(将处理后的数据写入AnalyticDB for PostgreSQL)。
将以下作业代码拷贝到作业文本编辑区。
---创建datagen源表,使用datagen连接器生成流式数据 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --创建adbpg源表,使用adbpg-cdc连接器,通过slot.name和逻pgoutput捕获adbpg_source_table的数据变更 CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --创建adbpg结果表,将处理后的结果写入数据库的目标表adbpg_sink_table。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- datagen_source表和source_adbpg源表关联后的结果写入adbpg结果表 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;
参数说明
参数名称
是否必填
数据类型
描述
connector
是
STRING
连接类型。源表固定值为
adbpg-cdc
,结果表固定值为adbpg
。hostname
是
STRING
AnalyticDB for PostgreSQL实例的内网地址。您可在目标实例的基本信息页签下获取内网地址。
username
是
STRING
AnalyticDB for PostgreSQL数据库的账号和密码。
password
是
STRING
database-name
是
STRING
数据库名称。
schema-name
是
STRING
Schema名称。该参数支持正则表达式,可以一次订阅多个Schema。
table-name
是
STRING
表名称。该参数支持正则表达式,可以一次订阅多个表。
port
是
INTEGER
AnalyticDB for PostgreSQL端口,固定值为5432。
decoding.plugin.name
是
STRING
Postgres Logical Decoding插件名称。固定值为pgoutput。
slot.name
是
STRING
逻辑解码槽的名字。
对于同一个Flink作业涉及的源表,建议使用相同的
slot.name
。如果不同的Flink作业涉及同一张表,则建议为每个作业分别设置独立的
slot.name
参数,以避免出现以下错误:PSQLException: ERROR: replication slot "debezium" is active for PID 974
。
debezium.*
否
STRING
该参数可以更细粒度地控制Debezium客户端的行为。例如,设置
'debezium.snapshot.mode' = 'never'
可以禁用快照功能。您可以通过配置属性获取更多配置详情。scan.incremental.snapshot.enabled
否
BOOLEAN
是否开启增量快照,取值如下。
false(默认值):不开启增量快照。
true:开启增量快照。
scan.startup.mode
否
STRING
消费数据时的启动模式,取值如下。
initial(默认值):在首次启动时,先扫描历史全量数据,然后读取最新的WAL日志数据,实现全量与增量数据的无缝衔接。
latest-offset:在首次启动时,不扫描历史全量数据,直接从 WAL 日志的末尾(即最新的日志位置)开始读取,仅捕获连接器启动后的最新变更数据。
snapshot:先扫描历史全量数据,同时读取全量阶段新产生的 WAL 日志,最终作业会在完成全量扫描后停止运行。
changelog-mode
否
STRING
用于编码流更改的变更日志(Changelog)模式,取值如下。
ALL(默认值):支持所有操作类型,包括
INSERT
、DELETE
、UPDATE_BEFORE
和UPDATE_AFTER
。UPSERT:仅支持
UPSERT
类型的操作,包括INSERT
、DELETE
和UPDATE_AFTER
。
heartbeat.interval.ms
否
DURATION
发送心跳包的时间间隔,默认值为30秒(单位:毫秒)。
AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。
scan.incremental.snapshot.chunk.key-column
否
STRING
指定某一列作为快照阶段分片的切分列,默认情况下会从主键中选择第一列。
url
是
STRING
格式为
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>
。在作业开发页面顶部,单击深度检查,进行语法检查。
单击部署,单击确定。
单击右上角前往运维,在作业运维页面,单击启动。
步骤四:查看Flink写入数据
在测试数据库中执行如下语句,查看Flink写入的数据。
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;
在源表中新增50条数据,并在结果表中查看Flink写入的增量数据的总数。
-- 源表写入50条增量数据 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- 检查目标表新增数据 SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;
结果如下。
count ------- 50 (1 row)
注意事项
请及时管理Replication Slot,以避免磁盘空间的浪费。
为了防止Flink作业重启过程中因Checkpoint对应的WAL(Write-Ahead Log)被清理而导致数据丢失,Flink不会自动删除Replication Slot。因此,如果确认某个Flink作业不再需要重启,请手动删除其对应的Replication Slot,以释放其占用的资源。此外,如果Replication Slot的确认位点长时间未推进,AnalyticDB for PostgreSQL将无法清理该位点之后的WAL条目,可能会导致未使用的WAL数据持续积累,占用大量磁盘空间。
在AnalyticDB for PostgreSQL实例正常运行期间,能保证Exactly Once数据处理语义。但在故障场景下,仅能提供至少一次(At-Least Once)的语义保障。
订阅表的REPLICA IDENTITY参数会被CDC连接器修改为
FULL
,以保障该表数据同步的一致性,影响如下:磁盘空间占用增加。在频繁更新或删除操作的场景下,该设置将增加WAL日志的大小,导致磁盘空间占用增加。
写操作性能下降。在高并发写入场景下,性能可能会受到明显影响。
检查点压力增大。更大的WAL日志意味着检查点(Checkpoint)需要处理更多的数据,可能导致检查点时间延长。
最佳实践
Flink CDC支持通过Flink SQL API或DataStream API进行作业开发,可实现对源库单表或多表的全量与增量一体化的数据同步;此外,它还支持对异构数据源进行双流Join等计算。在整个数据处理过程中,Flink框架能够确保精确一次(Exactly-once)的事件处理语义。然而,它不适合对PostgreSQL系数据库进行全库同步,一方面是因为它不支持DDL的同步,另一方面需要在Flink SQL中定义每张表的结构,后期维护相对复杂。
本章节将从AnalyticDB for PostgreSQL同步数据到Kafka为例,介绍Flink CDC SQL作业开发的最佳实践。在开发Flink CDC作业前,请确保已完成前提条件中的资源准备与配置。
步骤一:准备测试表
在AnalyticDB for PostgreSQL实例中创建两张源表。
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);
步骤二:准备Kafka资源
将Flink工作空间所属的网段添加至Kafka的白名单中。
步骤三:新建Flink作业
步骤四:编写作业代码并部署
在Flink工作空间内编写SQL作业。将以下作业代码拷贝到作业文本编辑区,并将相关配置替换为实际值。
-- 使用一个source实现多表采集 CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- 使用 STATEMENT SET 包装多条语句 BEGIN STATEMENT SET; -- 使用元数据 table_name 路由到目标表 INSERT INTO KafkaProducts SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags FROM ADBPGSource WHERE table_name = 'documents'; END;
对于此SQL作业需要关注以下几点:
对于多表同步的任务,建议参考此SQL使用一个source表实现多表采集,需要在此source表中定义全部源表的列(重名列保留一个);向目标表写入时通过
METADATA
table_name 来路由到指定表。这种做法的优势在于,仅需要在AnalyticDB for PostgreSQL中创建一个Replication Slot,能降低对源库资源的占用,提升同步性能,便于后期维护。使用
table-name
参数来指定多张源表,表名放在括号中,使用|
符号分割, 例如(table1|table2|table3)
。debezium.snapshot.mode
配置为never
表示只同步源表增量数据;若需要同步全量及增量数据,修改配置为initial
。
在作业开发页面顶部,单击深度检查,进行语法检查。
单击部署,单击确定。
单击右上角前往运维,在作业运维页面,单击启动。
步骤五:插入测试数据
在AnalyticDB for PostgreSQL实例中分别对两张源表进行数据更新,并观察Kafka Topic中的消息变化。
您可以参考以下SQL插入测试数据:
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'测试商品', 'Test-2025', '一条测试商品数据', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"测试1", "测试2"}'
);
相关文档
Flink实时读写全量数据,订阅AnalyticDB for PostgreSQL的全量数据。