通过Flink CDC实时订阅全量和增量数据(邀测中)
AnalyticDB for PostgreSQL提供自研的CDC连接器,基于PostgreSQL的逻辑复制功能实现订阅全量和增量数据,可与Flink无缝集成。该连接器能够高效捕获源表的实时变更数据,支持实时数据同步、流式处理等,助力企业快速响应动态数据需求。本文介绍如何通过阿里云实时计算Flink版CDC实时订阅AnalyticDB for PostgreSQL的全量和增量数据。
使用限制
内核版本为7.1.1.4及以上的AnalyticDB for PostgreSQL7.0版实例。
暂不支持AnalyticDB for PostgreSQL的Serverless模式。
支持订阅Heap表和AO列存表的全量和增量数据。如果不是Heap表或AO列存表,您可通过Flink实时订阅全量数据订阅表的全量数据。
前提条件
AnalyticDB for PostgreSQL实例和Flink全托管工作空间位于同一VPC。
已提交工单为AnalyticDB for PostgreSQL实例开通CDC功能并获取CDC连接器jar包。
AnalyticDB for PostgreSQL账号是初始账号或高权限用户RDS_SUPERUSER。
已将Flink工作空间所属的网段添加至AnalyticDB for PostgreSQL的白名单。
已设置订阅表的REPLICA IDENTITY参数为
FULL
,保障该表数据同步的一致性。
操作步骤
步骤一:准备测试表和测试数据
登录云原生数据仓库AnalyticDB PostgreSQL版控制台,找到目标实例,单击实例ID。
在基本信息页面的右侧下,单击登录数据库。
创建测试数据库和源表adbpg_source_table,并向源表写入50条数据。
-- 创建测试数据库 CREATE DATABASE testdb; -- 切换至testdb数据库并创建schema CREATE SCHEMA testschema; -- 创建源表adbpg_source_table CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- 向adbpg_source_table的表中写入50条数据 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
创建结果表adbpg_sink_table,用于Flink写入结果数据。
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
步骤二:创建CDC连接器和Flink作业
登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击连接器,创建并上传CDC连接器。
在左侧导航栏,单击
。单击顶部菜单栏新建,选择空白的流作业草稿,单击下一步,在新建文件草稿对话框,填写作业配置信息。
作业参数
说明
示例
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
adbpg-test
存储位置
指定该作业的代码文件所属的文件夹。
您还可以在现有文件夹右侧,单击
图标,新建子文件夹。
作业草稿
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-6.0.7-flink-1.15
单击创建。
步骤三:编写作业代码并部署
创建数据源datagen_source(用于生成模拟数据)和source_adbpg(用于从 AnalyticDB for PostgreSQL数据库中捕获实时的数据变更),将两个数据源关联,并将结果写入结果表sink_adbpg(将处理后的数据写入AnalyticDB for PostgreSQL)。
将以下作业代码拷贝到作业文本编辑区。
---创建datagen源表,使用datagen连接器生成流式数据 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --创建adbpg源表,使用adbpg-cdc连接器,通过slot.name和逻pgoutput捕获adbpg_source_table的数据变更 CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --创建adbpg结果表,将处理后的结果写入数据库的目标表adbpg_sink_table。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- datagen_source表和source_adbpg源表关联后的结果写入adbpg结果表 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;
参数说明
参数名称
是否必填
数据类型
描述
connector
是
String
连接类型。源表固定值为
adbpg-cdc
,结果表固定值为adbpg
。hostname
是
STRING
AnalyticDB for PostgreSQL实例的内网地址。您可在目标实例的基本信息页签下获取内网地址。
userName
是
String
AnalyticDB for PostgreSQL数据库的账号和密码。
password
是
STRING
database-name
是
STRING
数据库名称。
schema-name
是
STRING
Schema名称。该参数支持正则表达式,可以一次订阅多个Schema。
table-name
是
STRING
表名称。该参数支持正则表达式,可以一次订阅多个表。
port
是
INTEGER
AnalyticDB for PostgreSQL端口,固定值为5432。
decoding.plugin.name
是
STRING
Postgres Logical Decoding插件名称。固定值为pgoutput。
slot.name
是
STRING
逻辑解码槽的名字。
对于同一个Flink作业涉及的源表,建议使用相同的
slot.name
。如果不同的Flink作业涉及同一张表,则建议为每个作业分别设置独立的
slot.name
参数,以避免出现以下错误:PSQLException: ERROR: replication slot "debezium" is active for PID 974
。
debezium.*
否
STRING
该参数可以更细粒度地控制Debezium客户端的行为。例如,设置
'debezium.snapshot.mode' = 'never'
可以禁用快照功能。您可以通过配置属性获取更多配置详情。scan.incremental.snapshot.enabled
否
BOOLEAN
是否开启增量快照,取值如下。
false(默认值):不开启增量快照。
true:开启增量快照。
scan.startup.mode
否
STRING
消费数据时的启动模式,取值如下。
initial(默认值):在首次启动时,先扫描历史全量数据,然后读取最新的WAL日志数据,实现全量与增量数据的无缝衔接。
latest-offset:在首次启动时,不扫描历史全量数据,直接从 WAL 日志的末尾(即最新的日志位置)开始读取,仅捕获连接器启动后的最新变更数据。
snapshot:先扫描历史全量数据,同时读取全量阶段新产生的 WAL 日志,最终作业会在完成全量扫描后停止运行。
changelog-mode
否
STRING
用于编码流更改的变更日志(Changelog)模式,取值如下。
ALL(默认值):支持所有操作类型,包括
INSERT
、DELETE
、UPDATE_BEFORE
和UPDATE_AFTER
。UPSERT:仅支持
UPSERT
类型的操作,包括INSERT
、DELETE
和UPDATE_AFTER
。
heartbeat.interval.ms
否
DURATION
发送心跳包的时间间隔,默认值为30秒(单位:毫秒)。
AnalyticDB for PostgreSQLCDC连接器通过主动向数据库发送心跳包,确保Slot的偏移量能够持续推进。在表数据变更不频繁的情况下,合理设置该参数可以及时清理WAL日志,避免浪费磁盘空间。
scan.incremental.snapshot.chunk.key-column
否
STRING
指定某一列作为快照阶段分片的切分列,默认情况下会从主键中选择第一列。
url
是
String
格式为
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>
。在作业开发页面顶部,单击深度检查,进行语法检查。
单击部署,单击确定。
单击右上角前往运维,在作业运维页面,单击启动。
步骤四:查看Flink写入数据
在测试数据库中执行如下语句,查看Flink写入的数据。
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;
在源表中新增50条数据,并在结果表中查看Flink写入的增量数据的总数。
-- 源表写入50条增量数据 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- 检查目标表新增数据 SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;
结果如下。
count ------- 50 (1 row)
注意事项
请及时管理Replication Slot,以避免磁盘空间的浪费。
为了防止Flink作业重启过程中因Checkpoint对应的WAL(Write-Ahead Log)被清理而导致数据丢失,Flink不会自动删除Replication Slot。因此,如果确认某个Flink作业不再需要重启,请手动删除其对应的Replication Slot,以释放其占用的资源。此外,如果Replication Slot的确认位点长时间未推进,AnalyticDB for PostgreSQL将无法清理该位点之后的WAL条目,可能会导致未使用的WAL数据持续积累,占用大量磁盘空间。
在AnalyticDB for PostgreSQL实例正常运行期间,能保证Exactly Once数据处理语义。但在故障场景下,仅能提供至少一次(At-Least Once)的语义保障。
相关文档
Flink实时订阅全量数据,订阅AnalyticDB for PostgreSQL的全量数据。