本文为您介绍Postgres的CDC(Change Data Capture)源表DDL定义、WITH参数、类型映射和代码示例。

注意 Postgres的CDC源表正处于公测中,如果您对作业稳定性要求较高时,建议不要使用Postgres的CDC源表。

什么是Postgres的CDC源表

Postgres的CDC源表,即Postgres的流式源表,用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。
说明 如果您使用的是开源的PostgreSQL数据库,Postgres CDC connector支持读取的Postgres版本为9.6及以上版本。

前提条件

  • 已创建Postgres数据库和表,创建RDS Postgres数据库和表的详情请参见创建数据库和账号
  • 已在阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上进行了相应的配置,详情请参见配置Postgres

使用限制

  • 仅Flink计算引擎VVR 2.1.2及以上版本支持Postgres的CDC Connector。
  • Postgres CDC在扫描全表数据时,是一次性读取完,没有可用于恢复的位点,所以无法在全表扫描阶段去执行Checkpoint。

    如果不执行Checkpoint,则Postgres CDC的源表会让执行中的Checkpoint一直等待,甚至到Checkpoint超时(如果表超级大,扫描耗时非常长)。超时的Checkpoint会被认为是失败的Checkpoint。而在Flink默认配置下,失败的Checkpoint会引发Flink作业Failover。

    因此,建议在表超大时,为了避免因为Checkpoint超时而导致作业失败,需要在作业开发页面高级配置更多Flink配置中,配置如下作业参数。
    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647
    参数 说明
    execution.checkpointing.interval Checkpoint的时间间隔。

    单位是Duration类型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints 容忍Checkpoint失败的次数。

    该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。如果表特别大,这个可以配置大一些。

    restart-strategy 重启策略,参数取值如下:
    • fixed-delay:固定延迟重启策略。
    • failure-rate:故障率重启策略。
    • exponential-delay:指数延迟重启策略。

    详情请参见Restart Strategies

    restart-strategy.fixed-delay.attempts 固定延迟重启策略下,尝试重启的最大次数。

DDL定义

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数 说明 是否必填 数据类型 备注
connector 源表类型 STRING 固定值为postgres-cdc
hostname Postgres数据库的IP地址或者Hostname。 STRING
username Postgres数据库服务的用户名。 STRING
password Postgres数据库服务的密码 STRING
database-name 数据库名称 STRING 数据库名称支持正则表达式以读取多个数据库的数据。
schema-name Postgres Schema名称 STRING Schema名称支持正则表达式以读取多个Schema的数据。
table-name Postgres表名 STRING 表名支持正则表达式去读取多个表的数据。
port Postgres数据库服务的端口号 INTEGER 默认值为5432。
decoding.plugin.name Postgres Logical Decoding插件名称 STRING 根据Postgres服务上安装的插件确定。支持的插件列表如下:
  • decoderbufs(默认值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
说明 如果您使用的是阿里云RDS PostgreSQL,你需要开启逻辑解码(wal2json)功能,详情请参见逻辑解码(wal2json)
debezium.* Debezium属性参数 STRING 更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性
说明 建议每个表都设置debezium.slot.name参数,以避免出现PSQLException: ERROR: replication slot "debezium" is active for PID 974报错。

类型映射

Postgres CDC和Flink字段类型对应关系如下。
Postgres CDC字段类型 Flink字段类型
SMALLINT SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER INT
SERIAL
BIGINT BIGINT
BIGSERIAL
BIGINT BIGINT
REAL FLOAT
FLOAT4
FLOAT8 DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n) STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA BYTES