Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。本文为您介绍如何使用Postgres CDC连接器。
背景信息
Postgres CDC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表 说明 您可以使用JDBC作为结果表和维表连接器。 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 不涉及 |
特色功能
Postgres CDC连接器接入CDC增量快照框架(实时计算引擎VVR 8.0.6及以上版本)。Postgres CDC读取历史全量数据后,自动切换到WAL变更日志读取,保证不多读也不少读数据。即使发生故障,也能保证Exactly Once语义处理数据。Postgres CDC源表提供了并发读取全量数据,无锁读取和断点续传的能力。
作为源表,功能与优势详情如下:
流批一体,支持读取全量和增量数据,无需维护两套流程。
支持并发读取全量数据,性能水平扩展。
全量读取无缝切换增量读取,自动缩容,节省计算资源。
全量阶段读取支持断点续传,更稳定。
无锁读取全量数据,不影响线上业务。
前提条件
Postgres CDC连接器通过PostgreSQL数据库的逻辑复制读取CDC变更流数据,支持阿里云RDS PostgreSQL、Amazon RDS PostgreSQL和自建PostgreSQL。
阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相应的配置可能有差异,请您在使用之前详细阅读配置Postgres文档进行相关配置。
完成配置后确保有下列的条件:
wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。
订阅表的REPLICA IDENTITY为FULL(发出的插入和更新操作事件包含表中所有列的旧值),以保障该表数据同步的一致性。
说明REPLICA IDENTITY是PostgreSQL特有的表级设置,它决定了逻辑解码插件在发生(INSERT)和更新(UPDATE)事件时,是否包含涉及的表列的旧值。REPLICA IDENTITY取值含义详情请参见REPLICA IDENTITY。
需要确保max_wal_senders和max_replication_slots的参数值均大于当前数据库复制槽已使用数与Flink作业所需要的slot数量。
确保账户系统权限为SUPERUSER或者同时拥有LOGIN和REPLICATION权限,并且具有订阅表的SELECT权限用于全量数据查询。
注意事项
仅实时计算引擎8.0.6及以上版本支持Postgres CDC增量快照功能。
Flink PostgreSQL CDC 作业依赖 Replication Slot 来确保 WAL(Write-Ahead Log)不被过早清理,从而保障数据一致性。但若管理不当,可能引发磁盘空间浪费或数据读取延迟等问题。请遵循以下建议:
请及时清理不再使用的 Slot
Flink 不会自动删除 Replication Slot,即使作业已停止(尤其无状态重启场景),以防止因 WAL 被清除而导致数据丢失。
若确认某作业不再启动,请手动删除其关联的 Replication Slot,释放磁盘空间。
重要生命周期管理:将 Replication Slot 视为作业资源的一部分,随作业启停同步管理。
避免复用旧 Slot
新作业应使用新的 Slot Name,而非复用旧 Slot。复用可能导致作业启动后需回溯大量历史 WAL,延迟读取最新数据。
PostgreSQL的逻辑复制要求一个 Slot 仅能被一个连接使用,不同作业必须使用不同的 Slot 名称。
重要命名规范:自定义slot.name时,避免使用带数字后缀的名称(如 my_slot_1),以防与临时 Slot 冲突。
启用增量快照下的Slot行为
前提条件:必须启用checkpoint,且Source 表必须声明主键。
Slot创建规则:
未开启增量快照:仅支持单并发,使用 1 个全局 Slot。
开启增量快照:
全量阶段:每个 Source 并发子任务会创建一个临时 Slot,命名格式为
${slot.name}_${task_id}。增量阶段:自动回收所有临时 Slot,仅保留 1 个全局 Slot。
最大Slot数量:Source 并发数 + 1(全量阶段)
资源与性能
若 PostgreSQL 的 Slot 数量或磁盘空间受限,应适当降低全量阶段的并发度(减少临时 Slot 数量),但会牺牲全量读取速度。
若下游支持幂等写入,可设置:
scan.incremental.snapshot.backfill.skip = true,跳过全量阶段的 Binlog 回溯,加快启动速度。此配置仅提供 At-Least-Once 语义。不适用于含聚合、维表 Join 等状态计算的作业(可能丢失中间状态所需的历史变更)。
不开启增量快照时,Postgres CDC连接器不支持在全表扫描阶段执行Checkpoint。
不开启增量快照时,如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在其他配置中配置如下参数,具体操作请参见如何配置自定义的作业运行参数?。避免在全量同步阶段由于Checkpoint超时导致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647相关的参数说明详情如下表所示。
参数
说明
备注
execution.checkpointing.interval
Checkpoint的时间间隔。
单位是Duration类型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失败的次数。
该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。
说明如果表特别大,建议将该参数值配置得大一些。
restart-strategy
重启策略。
参数取值如下:
fixed-delay:固定延迟重启策略。
failure-rate:故障率重启策略。
exponential-delay:指数延迟重启策略。
详情请参见Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延迟重启策略下,尝试重启的最大次数。
无。
语法结构
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | connector类型。 | STRING | 是 | 无 | 固定值为 |
hostname | Postgres数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 无。 |
username | Postgres数据库服务的用户名。 | STRING | 是 | 无 | 无。 |
password | Postgres数据库服务的密码。 | STRING | 是 | 无 | 无。 |
database-name | 数据库名称。 | STRING | 是 | 无 | 数据库名称。 |
schema-name | Postgres Schema名称。 | STRING | 是 | 无 | Schema名称支持正则表达式以读取多个Schema的数据。 |
table-name | Postgres表名。 | STRING | 是 | 无 | 表名支持正则表达式以读取多个表的数据。 |
port | Postgres数据库服务的端口号。 | INTEGER | 否 | 5432 | 无。 |
decoding.plugin.name | Postgres Logical Decoding插件名称。 | STRING | 否 | decoderbufs | 根据Postgres服务上安装的插件确定。支持的插件列表如下:
|
slot.name | 逻辑解码槽的名字。 | STRING | 8.0.1版本之前为非必填,从8.0.1版本开始为必填 | 8.0.1版本之前默认值为flink,从8.0.1版本开始无默认值 | 建议每个表都设置 |
debezium.* | Debezium属性参数。 | STRING | 否 | 无 | 更细粒度控制Debezium客户端的行为。例如 |
scan.incremental.snapshot.enabled | 是否开启增量快照。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | initial | 参数取值如下:
|
changelog-mode | 用于编码流更改的变更日志(Changelog)模式。 | String | 否 | all | 支持的Changelog模式包括:
|
heartbeat.interval.ms | 发送心跳包的时间间隔。 | Duration | 否 | 30s | 单位为毫秒。 Postgres CDC连接器主动向数据库发送心跳包来保证推进Slot的偏移量。当表变更不频繁时,设置该值可以及时回收WAL日志。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作为快照阶段切分分片的切分列。 | STRING | 否 | 无 | 默认从主键中选择第一列。 |
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的Reader。 | Boolean | 否 | false | 该配置生效需要设置 |
scan.incremental.snapshot.backfill.skip | 是否跳过全量阶段的日志读取。 | Boolean | 否 | false | 参数取值如下:
|
类型映射
PostgreSQL和Flink字段类型对应关系如下。
PostgreSQL字段类型 | Flink字段类型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用示例
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;相关文档
实时计算Flink版支持的连接器列表,请参见支持的连接器。
将数据写入PolarDB PostgreSQL版(Oracle语法兼容1.0)结果表,请参见PolarDB PostgreSQL版(Oracle语法兼容1.0)。
如果您需要读写RDS MySQL、PolarDB for MySQL或者自建MySQL数据库,请使用MySQL连接器。