Postgres CDC(公测中)
本文为您介绍如何使用Postgres CDC连接器。
背景信息
Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。
Postgres CDC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监控指标。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 不涉及 |
前提条件
已创建Postgres数据库和表,创建RDS Postgres数据库和表的详情请参见创建账号和数据库。
已在阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上进行了相应的配置,详情请参见配置Postgres。
使用限制
仅Flink计算引擎VVR 2.0及以上版本支持Postgres的CDC连接器。
Postgres CDC暂不支持在全表扫描阶段执行Checkpoint。
如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在作业开发页面高级配置的更多Flink配置中配置如下参数,避免在全量同步阶段由于Checkpoint超时导致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
相关的参数说明详情如下表所示。
参数
说明
备注
execution.checkpointing.interval
Checkpoint的时间间隔。
单位是Duration类型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失败的次数。
该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。
说明如果表特别大,建议将该参数值配置得大一些。
restart-strategy
重启策略。
参数取值如下:
fixed-delay:固定延迟重启策略。
failure-rate:故障率重启策略。
exponential-delay:指数延迟重启策略。
详情请参见Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延迟重启策略下,尝试重启的最大次数。
无。
语法结构
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | STRING | 是 | 无 | 固定值为 |
hostname | Postgres数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 无。 |
username | Postgres数据库服务的用户名。 | STRING | 是 | STRING | 无。 |
password | Postgres数据库服务的密码。 | STRING | 是 | 无 | 无。 |
database-name | 数据库名称。 | STRING | 是 | 无 | 数据库名称支持正则表达式以读取多个数据库的数据。 |
schema-name | Postgres Schema名称。 | STRING | 是 | 无 | Schema名称支持正则表达式以读取多个Schema的数据。 |
table-name | Postgres表名。 | STRING | 是 | 无 | 表名支持正则表达式去读取多个表的数据。 |
port | Postgres数据库服务的端口号。 | INTEGER | 否 | 5432 | 无。 |
decoding.plugin.name | Postgres Logical Decoding插件名称。 | STRING | 否 | decoderbufs | 根据Postgres服务上安装的插件确定。支持的插件列表如下:
说明 如果您使用的是阿里云RDS PostgreSQL,需要开启逻辑解码(wal2json)功能,详情请参见逻辑解码(wal2json)。 |
debezium.* | Debezium属性参数。 | STRING | 否 | 无 | 更细粒度控制Debezium客户端的行为。例如 |
slot.name | 逻辑解码槽的名字。 | STRING | 否 | flink | 建议每个表都设置 |
类型映射
Postgres CDC和Flink字段类型对应关系如下。
Postgres CDC字段类型 | Flink字段类型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用示例
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;