兼容PolarDB PostgreSQL版(兼容Oracle)的Debezium connector

兼容PolarDB PostgreSQL版(兼容Oracle)Debezium connector(简称Debezium PolarDBO connector),可用于捕获PolarDB PostgreSQL版(兼容Oracle)数据库中的行级别更改,生成数据更改事件记录,并将它们流式传输到Kafka Topic中。具体功能及用法请参考社区Debezium PostgreSQL connector

由于PolarDB PostgreSQL版(兼容Oracle)与社区PostgreSQL仅在少量数据类型和内置对象处理存在差异,本文为您介绍如何基于社区Debezium PostgreSQL connector,通过少量代码适配打包出支持PolarDB PostgreSQL版(兼容Oracle)Debezium connector。

打包Debezium PolarDBO connector

重要

Debezium PolarDBO connector基于社区Debezium PostgreSQL connector适配开发,无论是您自行打包,还是使用本文中提供的JAR包,Debezium PolarDBO connector都不提供SLA保障。

操作前提

  • 配置Java环境

    目前Debezium各版本均要求Java11及以上版本,请在打包和正式运行时提前配置Java11环境。

  • 确定Debezium版本

    根据您使用的Kafka/Kafka ConnectPolarDB PostgreSQL版(兼容Oracle)版本,确定Debezium版本。具体的版本兼容信息,请参考Debezium发布概览

    说明
    • Debezium代码仓库请参考Debezium

    • 对于PolarDB PostgreSQL版(兼容Oracle)匹配的社区版本如下:

      • Oracle语法兼容 2.0对应社区PostgreSQL 14。

      • Oracle语法兼容 1.0对应社区PostgreSQL 11。

  • 确定PgJDBC版本

    在对应版本的Debeziumpom.xml中通过查找关键字version.postgresql.driver确定PgJDBC版本。

    说明

    PgJDBC代码仓库请参考PgJDBC

操作步骤

社区Debezium 2.6.2.Final支持Kafka Connect 2.x、3.x版本,支持PostgreSQL 10、11、12、13、14、15、16版本。

接下来以社区Debezium 2.6.2.Final版本为例,为您介绍具体的打包步骤:

  1. 克隆对应版本的DebeziumPgJDBC的代码文件。

    git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git
    git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
  2. 复制PgJDBC部分文件到Debezium中。

    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3       
    mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc
  3. 应用适配PolarDB PostgreSQL版(兼容Oracle)patch文件。

    git apply v2.6.2.Final-support-polardbo-v1.patch
    说明
    • 以上使用的Debezium PolarDBO connector兼容patch文件:v2.6.2.Final-support-polardbo-v1.patch

    • patch文档默认将依赖debezium-api、debezium-core、PgJDBCprotobuf-java打包至JAR包,如不需要可以从pom.xml中移除。

  4. 使用Maven打包Debezium PolarDBO connector。

    mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick
    # 打包完成后可以在debezium-connector-postgres/的target目录中获取到jar

    按照以上流程基于JDK11打包出Debezium PolarDBO connectorJAR包:debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar

使用说明

Debezium PolarDBO connector是通过PolarDB PostgreSQL版(兼容Oracle)数据库的逻辑复制读取增量变更,使用时需要满足以下条件:

  • wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑复制所需的信息。

    说明

    您可以通过控制台设置wal_level参数,详细操作请参考设置集群参数。修改该参数后集群将会重启,请在修改参数前做好业务安排,谨慎操作。

  • 执行ALTER TABLE schema.table REPLICA IDENTITY FULL;命令设置订阅表的REPLICA IDENTITYFULL(发出的插入和更新操作事件包含表中所有列的旧值),以保障该表数据同步的一致性。

    说明
    • REPLICA IDENTITYPostgreSQL特有的表级设置,决定了逻辑解码插件在发生(INSERT)和更新(UPDATE)事件时,是否包含涉及的表列的旧值。REPLICA IDENTITY取值含义详情,请参见REPLICA IDENTITY

    • 设置订阅表的REPLICA IDENTITYFULL时可能需要锁表,进而影响业务,请在修改参数前做好业务安排。您可以通过以下命令查看当前配置是否为FULL

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • 需要确保max_wal_sendersmax_replication_slots的参数值均大于当前数据库复制槽已使用数和Kafka作业所需要的slot数量。

  • 确保使用的是高权限账号或者同时拥有LOGINREPLICATION权限的普通账号,并且具有订阅表的SELECT权限用于全量数据查询。

  • 只能连接PolarDB集群的主地址,集群地址不支持逻辑复制。

  • connector.class参数指定为io.debezium.connector.postgresql.PolarDBOConnector

  • 建议将plugin.name参数设置为pgoutput,否则非UTF-8编码的数据库可能会发生增量解析乱码,详细介绍请参考社区文档

示例

以下示例用于说明,如何通过Debezium PolarDBO connector,将PolarDB PostgreSQLOracle语法兼容 2.0集群中dbz_db库的t1t2表,同步到Kafka消息队列中。

前提准备

  1. Kafka准备

    1. 部署Kafka实例,并确保在Kafka Connect的主机上能够成功访问。您也可以直接使用云消息队列 Kafka 版,详情请参考快速入门

    2. Kafka实例中创建一个名为pg_dbz_eventTopic,用于接收消息。

      说明

      测试场景为便于查看可以创建单分区Topic,对于生产环境请创建多分区Topic。

  2. 在本地以distributed模式启动Kafka Connect,端口为8083。

    • 将上文打包的Debezium PolarDBO connectorJAR包拷贝到Kafka Connectplugin.path目录中。

      # ${plugin.path} 请替换为具体的路径
      mkdir ${plugin.path}/debezium-connector-polardbo
      cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
  3. PolarDB PostgreSQL版(兼容Oracle)准备

    1. PolarDB集群购买页面,购买PolarDB PostgreSQL版(兼容Oracle) 2.0集群。

    2. 按照使用说明,完成PolarDB集群配置,满足Debezium PolarDBO connector使用前提。

    3. 创建高权限账户,详细操作请参考创建账号

    4. 获取集群主地址,详细操作请参考查看连接地址。如果PolarDB集群和Kafka Connect在同一可用区,可直接使用私网地址,否则需要申请公网地址。将Kafka Connect实例地址添加到PolarDB集群白名单中,请参考设置集群白名单

    5. 在控制台创建数据库dbz_db,详细步骤请参考创建数据库

    6. 执行如下语句,在数据库dbz_db中创建表t1、t2,并写入数据。

      CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP);
      ALTER TABLE public.t1 REPLICA IDENTITY FULL;
      INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now());
      CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE);
      ALTER TABLE public.t2 REPLICA IDENTITY FULL;
      INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());

测试

  1. 创建配置文件config/postgresql-connector.json,配置说明请参考社区文档

    {
      "name": "dbz-polardb",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector",
        "database.hostname": "<yourHostname>", 
        "database.port": "<yourPort>", 
        "database.user": "<yourUserName>", 
        "database.password": "<yourPassWord>", 
        "database.dbname" : "dbz_db",
        "plugin.name": "pgoutput",
        "slot.name": "dbz_polardb",
        "table.include.list": "public.t1,public.t2",
        "topic.prefix": "polardb"
        "transforms": "Combine",
        "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Combine.topic.regex": "(.*)",
        "transforms.Combine.topic.replacement": "pg_dbz_event"
      }
    }
    说明

    默认需要为每个表创建一个Topic,以上配置对Topic做了聚合。

  2. 添加connector。

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 'http://localhost:8083/connectors' -d @config/postgresql-connector.json

    成功添加后,能够在KafkaTopic中查询到全量数据。

    image.png

  3. PolarDB集群的dbz_db库中执行以下DML语句:

    INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t1 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t1 WHERE a = 2;
    INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now());
    
    INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now());
    UPDATE public.t2 SET b = 'c' WHERE a = 1;
    DELETE FROM public.t2 WHERE a = 2;
    INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());

    能够在KafkaTopic中查询到增量数据。

    image.png