文档

Postgres CDC(公测中)

更新时间:

Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。本文为您介绍如何使用Postgres CDC连接器。

背景信息

Postgres CDC连接器支持的信息如下。

类别

详情

支持类型

源表

说明

您可以使用JDBC作为结果表和维表连接器。

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

    该指标仅在增量阶段有效,全量阶段该值恒为0。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

    该指标仅在增量阶段有效,全量阶段该值恒为0。

  • sourceIdleTime:source至今有多久不产生新数据。

说明

指标的含义及如何查看监控指标,请参见自定义监控指标上报渠道

API种类

SQL

是否支持更新或删除结果表数据

不涉及

前提条件

Postgres CDC通过PostgreSQL数据库的逻辑复制读取CDC变更流数据,需要满足以下条件:

  • wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。

  • 执行ALTER TABLE schema.table REPLICA IDENTITY FULL;命令设置订阅表的REPLICA IDENTITY为FULL,以保障该表数据同步的一致性。

  • 需要确保max_wal_senders和max_replication_slots的参数值均大于当前数据库复制槽已使用数与Flink作业所需要的slot数量。

  • 确保账户系统权限为SUPERUSER或者同时拥有LOGIN和REPLICATION权限,并且有订阅表的SELECT权限用于全量数据查询。

说明

阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相应的配置可能有差异,详情请参见配置Postgres

使用限制

  • 仅Flink计算引擎VVR 2.0及以上版本支持Postgres的CDC连接器。

  • Postgres CDC暂不支持在全表扫描阶段执行Checkpoint。

    如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在其他配置中配置如下参数,具体操作请参见如何配置作业运行参数?。避免在全量同步阶段由于Checkpoint超时导致Failover。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    相关的参数说明详情如下表所示。

    参数

    说明

    备注

    execution.checkpointing.interval

    Checkpoint的时间间隔。

    单位是Duration类型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints

    容忍Checkpoint失败的次数。

    该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。

    说明

    如果表特别大,建议将该参数值配置得大一些。

    restart-strategy

    重启策略。

    参数取值如下:

    • fixed-delay:固定延迟重启策略。

    • failure-rate:故障率重启策略。

    • exponential-delay:指数延迟重启策略。

    详情请参见Restart Strategies

    restart-strategy.fixed-delay.attempts

    固定延迟重启策略下,尝试重启的最大次数。

    无。

语法结构

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

STRING

固定值为postgres-cdc

hostname

Postgres数据库的IP地址或者Hostname。

STRING

无。

username

Postgres数据库服务的用户名。

STRING

无。

password

Postgres数据库服务的密码。

STRING

无。

database-name

数据库名称。

STRING

数据库名称支持正则表达式以读取多个数据库的数据。

schema-name

Postgres Schema名称。

STRING

Schema名称支持正则表达式以读取多个Schema的数据。

table-name

Postgres表名。

STRING

表名支持正则表达式去读取多个表的数据。

port

Postgres数据库服务的端口号。

INTEGER

5432

无。

decoding.plugin.name

Postgres Logical Decoding插件名称。

STRING

decoderbufs

根据Postgres服务上安装的插件确定。支持的插件列表如下:

  • decoderbufs(默认值)

  • wal2json

  • wal2json_rds

  • wal2json_streaming

  • wal2json_rds_streaming

  • pgoutput

说明

如果您使用的是阿里云RDS PostgreSQL,需要开启逻辑解码(wal2json)功能,详情请参见逻辑解码(wal2json)

debezium.*

Debezium属性参数。

STRING

更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性

slot.name

逻辑解码槽的名字。

STRING

8.0.1版本之前为非必填,从8.0.1版本开始为必填

8.0.1版本之前默认值为flink,从8.0.1版本开始无默认值

建议每个表都设置slot.name参数,以避免出现PSQLException: ERROR: replication slot "debezium" is active for PID 974报错。

类型映射

Postgres CDC和Flink字段类型对应关系如下。

Postgres CDC字段类型

Flink字段类型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

使用示例

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

相关文档

  • 本页导读 (1)
文档反馈