Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。本文为您介绍如何使用Postgres CDC连接器。
背景信息
Postgres CDC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表 说明 您可以使用JDBC作为结果表和维表连接器。 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 不涉及 |
特色功能
Postgres CDC连接器接入CDC增量快照框架(实时计算引擎VVR 8.0.6及以上版本)。Postgres CDC读取历史全量数据后,自动切换到WAL变更日志读取,保证不多读也不少读数据。即使发生故障,也能保证Exactly Once语义处理数据。Postgres CDC源表提供了并发读取全量数据,无锁读取和断点续传的能力。
作为源表,功能与优势详情如下:
流批一体,支持读取全量和增量数据,无需维护两套流程。
支持并发读取全量数据,性能水平扩展。
全量读取无缝切换增量读取,自动缩容,节省计算资源。
全量阶段读取支持断点续传,更稳定。
无锁读取全量数据,不影响线上业务。
前提条件
Postgres CDC连接器通过PostgreSQL数据库的逻辑复制读取CDC变更流数据,需要满足以下条件:
wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。
执行
ALTER TABLE schema.table REPLICA IDENTITY FULL;
命令设置订阅表的REPLICA IDENTITY为FULL(发出的插入和更新操作事件包含表中所有列的旧值),以保障该表数据同步的一致性。说明REPLICA IDENTITY是PostgreSQL特有的表级设置,它决定了逻辑解码插件在发生(INSERT)和更新(UPDATE)事件时,是否包含涉及的表列的旧值。REPLICA IDENTITY取值含义详情请参见REPLICA IDENTITY。
需要确保max_wal_senders和max_replication_slots的参数值均大于当前数据库复制槽已使用数与Flink作业所需要的slot数量。
确保账户系统权限为SUPERUSER或者同时拥有LOGIN和REPLICATION权限,并且具有订阅表的SELECT权限用于全量数据查询。
阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相应的配置可能有差异,详情请参见配置Postgres。
注意事项
仅实时计算引擎8.0.6及以上版本支持Postgres CDC增量快照功能。
请及时管理Replication Slot,以免出现磁盘空间浪费的问题。
为了防止在Flink作业重启过程中由于Checkpoint对应的WAL(Write-Ahead Log)段被清除而引发数据丢失,Flink作业不会自动移除Replication Slot。因此,如果确认特定的Flink作业不会再次启动,应当手动删除相关的Replication Slot,以释放其占用的资源。另外,如果PostgreSQL的Replication Slot的确认位点长时间不向前推进,PostgreSQL不会清理该槽位点之后的WAL条目,这可能会导致未使用的WAL积累而占用过多的磁盘空间。
开启增量快照时,Postgres CDC连接器必须开启Checkpoint,并且Source表必须声明主键。Source多并发读取全量数据时会创建多个临时的Replication Slot。
不开启增量快照读取的PostgreSQL CDC Source仅支持单一并发,因此只需要一个全局Slot。当开启增量快照时,PostgreSQL CDC Source在全量阶段所需的最大Slot数量为
Source数量 * 并发数 + 1
。进入增量阶段后,系统自动回收在全量阶段创建的Slot,仅保留一个全局Slot。如果Slot数量有限,需要控制全量阶段的并发数量,这样做的缺点是会降低读取速度。如果下游算子或存储支持幂等性,可以启用scan.incremental.snapshot.backfill.skip = true
以跳过全量阶段的日志读取,这样做的缺点是仅能提供至少一次(At-Least Once)的语义保证。如果SQL要做聚合、关联等操作,不建议跳过全量阶段日志的读取。
不开启增量快照时,Postgres CDC连接器不支持在全表扫描阶段执行Checkpoint。
不开启增量快照时,如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在其他配置中配置如下参数,具体操作请参见如何配置作业运行参数?。避免在全量同步阶段由于Checkpoint超时导致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
相关的参数说明详情如下表所示。
参数
说明
备注
execution.checkpointing.interval
Checkpoint的时间间隔。
单位是Duration类型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失败的次数。
该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。
说明如果表特别大,建议将该参数值配置得大一些。
restart-strategy
重启策略。
参数取值如下:
fixed-delay:固定延迟重启策略。
failure-rate:故障率重启策略。
exponential-delay:指数延迟重启策略。
详情请参见Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延迟重启策略下,尝试重启的最大次数。
无。
语法结构
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | STRING | 是 | 无 | 固定值为 |
hostname | Postgres数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 无。 |
username | Postgres数据库服务的用户名。 | STRING | 是 | 无 | 无。 |
password | Postgres数据库服务的密码。 | STRING | 是 | 无 | 无。 |
database-name | 数据库名称。 | STRING | 是 | 无 | 数据库名称支持正则表达式以读取多个数据库的数据。 |
schema-name | Postgres Schema名称。 | STRING | 是 | 无 | Schema名称支持正则表达式以读取多个Schema的数据。 |
table-name | Postgres表名。 | STRING | 是 | 无 | 表名支持正则表达式去读取多个表的数据。 |
port | Postgres数据库服务的端口号。 | INTEGER | 否 | 5432 | 无。 |
decoding.plugin.name | Postgres Logical Decoding插件名称。 | STRING | 否 | decoderbufs | 根据Postgres服务上安装的插件确定。支持的插件列表如下:
|
slot.name | 逻辑解码槽的名字。 | STRING | 8.0.1版本之前为非必填,从8.0.1版本开始为必填 | 8.0.1版本之前默认值为flink,从8.0.1版本开始无默认值 | 建议每个表都设置 |
debezium.* | Debezium属性参数。 | STRING | 否 | 无 | 更细粒度控制Debezium客户端的行为。例如 |
scan.incremental.snapshot.enabled | 是否开启增量快照。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | initial | 参数取值如下:
|
changelog-mode | 用于编码流更改的变更日志(Changelog)模式。 | String | 否 | all | 支持的Changelog模式包括:
|
heartbeat.interval.ms | 发送心跳包的时间间隔。 | Duration | 否 | 30s | 单位为毫秒。 Postgres CDC连接器主动向数据库发送心跳包来保证推进Slot的偏移量。当表变更不频繁时,设置该值可以及时回收WAL日志。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作为快照阶段切分分片的切分列。 | STRING | 否 | 无 | 默认从主键中选择第一列。 |
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的Reader。 | Boolean | 否 | false | 该配置生效需要设置 |
scan.incremental.snapshot.backfill.skip | 是否跳过全量阶段的日志读取。 | Boolean | 否 | false | 参数取值如下:
|
类型映射
Postgres CDC和Flink字段类型对应关系如下。
Postgres CDC字段类型 | Flink字段类型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用示例
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
相关文档
实时计算Flink版支持的连接器列表,请参见支持的连接器。
将数据写入PolarDB PostgreSQL版(Oracle语法兼容1.0)结果表,请参见PolarDB PostgreSQL版(Oracle语法兼容1.0)。
如果您需要读写RDS MySQL、PolarDB for MySQL或者自建MySQL数据库,请使用MySQL连接器。