兼容PolarDB PostgreSQL版(兼容Oracle)的Flink CDC

更新时间:2024-10-15 08:08:10

兼容PolarDB PostgreSQL版(兼容Oracle)Flink CDC连接器(简称PolarDBO Flink CDC)可用于依次读取PolarDB PostgreSQL版(兼容Oracle)数据库全量快照数据和变更数据,具体功能及用法请参考社区Postgres CDC

由于PolarDB PostgreSQL版(兼容Oracle)与社区PoatgreSQL仅在少量数据类型和内置对象处理存在差异,本文为您介绍如何基于社区Postgres CDC,通过少量代码适配打包出支持PolarDB PostgreSQL版(兼容Oracle)PolarDBO Flink CDC连接器。

说明

PolarDB PostgreSQL版(兼容Oracle)DATE类型是64位,而社区PostgreSQLDATE类型为32位。因此,在PolarDBO Flink CDC中会对DATA类型数据的处理进行适配。

打包PolarDBO Flink CDC连接器

重要

PolarDBO Flink CDC连接器基于社区Postgres CDC适配开发,无论是您自行打包,还是使用本文中提供的JAR包,PolarDBO Flink CDC连接器都不提供SLA保障。

操作前提

  • 确定Flink-CDC版本

    如果您使用的是阿里云实时计算 Flink 版,需要确认与对应Ververica Runtime(简称VVR)版本兼容的社区Flink-CDC版本,具体可以参考CDCVVR版本对应关系

    说明

    Flink-CDC代码仓库请参考Flink-CDC

  • 确定Debezium版本

    在对应版本的Flink-CDCpom.xml中通过查找关键字debezium.version确定Debezium版本。

    说明

    Debezium代码仓库请参考Debezium

  • 确定PgJDBC版本

    在对应版本的Postgres-CDCpom.xml中通过查找关键字org.postgresql确定PgJDBC版本。

    说明
    • release-3.0以下版本文件路径为:flink-connector-postgres-cdc/pom.xml

    • release-3.0及以上版本文件路径为:flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml

    • PgJDBC代码仓库请参考PgJDBC

操作步骤

release-3.1打包

社区Flink-CDC release-3.1版本兼容阿里云实时计算 Flink 版vvr-8.0.x-flink-1.17。

打包对应版本的PolarDBO Flink CDC连接器步骤如下:

  1. 克隆对应版本的Flink-CDC、DebeziumPgJDBC的代码文件。

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. 复制DebeziumPgJDBC部分文件到Flink-CDC中。

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. 应用适配PolarDB PostgreSQL版(兼容Oracle)patch文件。

    git apply release-3.1_support_polardbo.patch
    说明

    以上使用的PolarDBO Flink CDC兼容patch文件:release-3.1_support_polardbo.patch

  4. 使用Maven打包PolarDBO Flink CDC连接器。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # 打包完成后可以在flink-sql-connector-postgres-cdc的target目录中获取到jar包

按照以上流程基于JDK8打包出PolarDBO Flink CDC连接器的JAR包:flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar

release-2.3打包

社区Flink-CDC release-2.3版本兼容阿里云实时计算 Flink 版vvr-4.0.15-flink-1.13 ~ vvr-6.0.2-flink-1.15。

打包对应版本的PolarDBO Flink CDC连接器步骤如下:

  1. 克隆对应版本的Flink-CDC、DebeziumPgJDBC的代码文件。

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. 复制DebeziumPgJDBC部分文件到Flink-CDC中。

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. 应用适配PolarDB PostgreSQL版(兼容Oracle)patch文件。

    git apply release-2.3_support_polardbo.patch
    说明

    以上使用的PolarDBO Flink CDC兼容patch文件:release-2.3_support_polardbo.patch

  4. 使用Maven打包PolarDBO Flink CDC连接器。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # 打包完成后可以在flink-sql-connector-postgres-cdc的target目录中获取到jar包

按照以上流程基于JDK8打包出PolarDBO Flink CDC连接器的JAR包:flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar

使用说明

PolarDBO Flink CDC连接器通过PolarDB PostgreSQL版(兼容Oracle)数据库的逻辑复制读取CDC变更流数据,需要满足以下条件:

  • wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑复制所需的信息。

    说明

    您可以通过控制台设置wal_level参数,详细操作请参考设置集群参数。修改该参数后集群将会重启,请在修改参数前做好业务安排,谨慎操作。

  • 执行ALTER TABLE schema.table REPLICA IDENTITY FULL;命令设置订阅表的REPLICA IDENTITYFULL(发出的插入和更新操作事件包含表中所有列的旧值),以保障该表数据同步的一致性。

    说明
    • REPLICA IDENTITYPostgreSQL特有的表级设置,决定了逻辑解码插件在发生(INSERT)和更新(UPDATE)事件时,是否包含涉及的表列的旧值。REPLICA IDENTITY取值含义详情,请参见REPLICA IDENTITY

    • 设置订阅表的REPLICA IDENTITYFULL时可能需要锁表,可能影响业务,请在修改参数前做好业务安排。您可以通过以下命令查看当前配置是否为FULL

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • 需要确保max_wal_sendersmax_replication_slots的参数值均大于当前数据库复制槽已使用数与Flink作业所需要的slot数量。

  • 确保使用的是高权限账号或者同时拥有LOGINREPLICATION权限,并且具有订阅表的SELECT权限用于全量数据查询。

  • 只能连接PolarDB集群的主地址,集群地址不支持逻辑复制。

PolarDBO Flink CDC连接器Postgres CDC区别

PolarDBO Flink CDC连接器基于Postgres CDC打包,具体语法和参数可以参考Postgres CDC。但存在以下主要区别:

  • WITHconnector参数需要设置为固定值:polardbo-cdc

  • PolarDBO Flink CDC同时兼容PolarDB PostgreSQL各版本、PolarDB PostgreSQL版(兼容Oracle) 1.0PolarDB PostgreSQL版(兼容Oracle) 2.0版本。

    说明

    如果您使用的是PolarDB PostgreSQL,推荐您直接使用社区Postgres CDC

  • PolarDB PostgreSQL版(兼容Oracle) 1.0、PolarDB PostgreSQL版(兼容Oracle) 2.0中的DATE类型的列,Flink SQL中的sourcesink表对应类型必须指定为timestamp

  • 建议将decoding.plugin.name参数设置为pgoutput,否则非UTF-8编码的数据库可能会发生增量解析乱码,详细介绍请参考社区文档

类型映射

PolarDB PostgreSQLFlink字段类型映射,除DATE类型外,其他字段类型和社区PostgreSQL完全相同,具体映射关系如下:

PolarDB PostgreSQL字段类型

Flink字段类型

PolarDB PostgreSQL字段类型

Flink字段类型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

  • PolarDB PostgreSQL版(兼容Oracle) 1.0:TIMESTAMP

  • PolarDB PostgreSQL版(兼容Oracle) 2.0:TIMESTAMP

  • PolarDB PostgreSQL:DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

示例

以下示例用于说明,如何通过PolarDBO Flink CDC,将PolarDB PostgreSQL版(兼容Oracle) 2.0集群中flink_source库的shipments表,同步到flink_sink库的shipments_sink表中。

说明

以下示例仅用于简单验证打包的PolarDBO Flink CDC能够在PolarDB PostgreSQL版(兼容Oracle)上运行。正式使用时,为满足您的业务需求,请参考社区Postgres CDC配置参数。

  1. 前提准备

    • PolarDB PostgreSQL版(兼容Oracle)准备

      1. PolarDB集群购买页面,购买PolarDB PostgreSQL版(兼容Oracle) 2.0集群。

      2. 创建高权限账户,详细操作请参考创建账号

      3. 获取集群主地址,详细操作请参考查看连接地址。如果PolarDB集群和实时计算 Flink 版在同一可用区,可直接使用私网地址,否则需要申请公网地址。将Flink实例地址添加到PolarDB集群白名单中,请参考设置集群白名单

      4. 在控制台创建源数据库flink_source和目标数据库flink_sink,详细步骤请参考创建数据库

      5. 执行如下语句,在源数据库flink_source中创建shipments表,并写入数据。

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      6. 执行如下语句,在目标数据库flink_sink中创建shipments_sink表。

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • 实时计算 Flink 版准备

      1. 登录实时计算控制台,购买实时计算 Flink 版实例,详细操作请参考开通实时计算Flink

        说明

        建议实时计算 Flink 版地域专有网络PolarDB集群保持一致,连接地址可以直接使用PolarDB集群主地址的私网地址。

      2. 创建自定义连接器,上传打包好的PolarDBO Flink CDCFormats选择debezium-json,详细步骤请参考创建自定义连接器

        image

  2. 创建Flink作业

    1. 登录实时计算控制台,新建一个SQL作业草稿,请参考SQL作业开发。使用以下Flink SQL语句,修改PolarDB集群主地址,端口,账号和密码。

      说明

      PolarDB PostgreSQL版(兼容Oracle)DATE类型是64位,而Flink SQL以及大部分数据库的DATE类型为32位。因此,源表中DATE类型的列,在Flink SQLsourcesink表中都必须要指定为TIMESTAMP类型。否则,作业会因为类型不匹配而报错中断,例如:“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471):1720891573000”

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
      );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. 部署并启动作业。

      image

      image

    3. 测试与验证。

      • 部署作业运行成功后,即状态为运行中,shipments表中的数据已经同步到目标数据库flink_sinkshipments_sink表。

        SELECT * FROM public.shipments_sink;

        返回结果如下:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • 在源数据库flink_sourceshipments表上执行DML,新增修改也将实时同步。

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        shipments表中的数据已经同步更新到目标数据库flink_sinkshipments_sink表。

        SELECT * FROM public.shipments_sink;

        返回结果如下:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)
  • 本页导读 (1)
  • 打包PolarDBO Flink CDC连接器
  • 操作前提
  • 操作步骤
  • 使用说明
  • PolarDBO Flink CDC连接器与Postgres CDC区别
  • 类型映射
  • 示例