配置CDC以同步数据变更

更新时间:
复制为 MD 格式

变更数据捕获(Change Data Capture, CDC)功能允许您实时捕获数据库中的数据修改(INSERT/UPDATE/DELETE),并将其以事件流的形式同步到下游系统,如数据仓库、分析平台(如Flink)或其他数据库实例。

PolarDB PostgreSQL分布式版集群中,搭建CDC的关键在于理解数据的来源:

  • 分布表:数据变更发生在各个数据节点(DN)上,因此下游需要分别订阅所有DN节点才能获取完整的变更数据。

  • 复制表:数据变更会同步到所有节点,但为了避免重复,变更流统一由主计算节点(主CN)发布,下游只需订阅主CN即可。

准备工作:检查与配置

在开始之前,请确保数据库集群的相关参数已正确设置。这些参数是开启逻辑复制功能的前提。

说明

默认情况下,PolarDB PostgreSQL分布式版已配置好以下参数。如果检查结果不符,请提交工单联系我们处理。

  • 在主CN节点执行以下 SQL,检查所有节点的polar_cluster.enable_change_data_capture是否为on

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);

    预期结果如下:所有节点的result列都应为on

     success | result 
    ---------+--------
     t       | on
     t       | on
     t       | on
     t       | on
  • 在主CN节点上执行以下SQL,检查所有节点的wal_level是否为logical

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);

    预期结果如下:所有节点的result列都应为logical

    success | result  
    ---------+---------
     t       | logical
     t       | logical
     t       | logical
     t       | logical

步骤一:在发布端创建发布和复制槽

发布端(即您的PolarDB PostgreSQL分布式版集群)的配置分为两步:创建发布(Publication)和创建复制槽(Replication Slot)。

创建发布

发布是定义哪些表的变更需要被捕获的集合。此操作只需在主CN上执行一次,系统会自动将其同步到所有节点。

CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
说明
  • 请勿使用FOR ALL TABLES选项创建发布,应明确指定需要发布的表。

  • <publication_name>为发布名,<table_name1>/<table_name2>为要发布的分布表/复制表。发布端仅需发布分布表的逻辑表名,无需发布物理分片的表名。

创建复制槽

复制槽用于为下游订阅者保留WAL日志,防止其在被消费前被清理。由于数据变更来源于主CN和所有DN,您需要在这些节点上都创建复制槽。

在主CN节点上执行以下SQL,即可在所有相关节点(主CN和所有DN)上批量创建同名的复制槽:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
说明

<publication_slot_name>为复制槽名称。

预期的结果如下,其中success列为t表示复制槽创建成功。否则复制槽创建失败,通过result列可以查看具体原因:

   node_name    | node_port | success |               result               
----------------+-----------+---------+------------------------------------
 10.xxx.xxx.xxx |      3007 | t       | (<publication_slot_name>,0/C024D7D0)
 10.xxx.xxx.xxx |      3006 | t       | (<publication_slot_name>,0/C33B6668)
 10.xxx.xxx.xxx |      3003 | t       | (<publication_slot_name>,0/C33949B0)
(3 rows)

步骤二:在订阅端创建订阅

订阅端(如Debezium、Flink或另一个PostgreSQL实例)需要为每一个创建了复制槽的节点(主CN和所有DN)分别建立一个订阅连接,以接收完整的变更数据。

说明

每个订阅连接的配置基本相同,只需修改主机地址和端口号。

Debezium示例

如果使用Debezium作为订阅端,那么每个订阅端仅database.hostnamedatabase.port两个配置项需要修改为相应节点的主地址和端口号,其余配置完全相同。以订阅其中一个DN节点的配置为例:

{
    "name": "xxx",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname" : "<DN1主地址>",
        "database.port" : "<DN1端口>",
        "database.user" : "<您的用户名>",
        "database.password" : "<您的密码>",
        "database.dbname" : "postgres",
        "slot.name": "<复制槽名称>",
        "publication.name": "<发布名称>",
        ...
    }
}

PostgreSQL示例

如果使用PostgreSQL作为订阅端,那么每个订阅端仅hostport两个配置项需要修改为相应节点的主地址和端口号,其余配置完全相同。以订阅其中一个DN节点的配置为例:

CREATE SUBSCRIPTION test_subscription
    CONNECTION 'dbname=postgres host=<DN1主地址> port=<DN1端口> user=<您的用户名> password=<您的密码>'
    PUBLICATION <发布名称>
    WITH (create_slot=false, slot_name='<复制槽名称>');

步骤三:验证CDC链路

在所有订阅端配置完成后,您可以在主CN上执行以下SQL,检查所有发布节点的复制槽是否都已激活。

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;

预期得到的结果如下。如果result列值为t则表示相应节点与订阅端的复制关系已经建立。此时,您可以在源端表中插入或修改数据,并观察订阅端是否能接收到变更。

  node_name     | node_port | success | result 
----------------+-----------+---------+--------
 10.xxx.xxx.xxx |      3007 | t       | t
 10.xxx.xxx.xxx |      3006 | t       | t
 10.xxx.xxx.xxx |      3003 | t       | t
(3 rows)

维护与清理

如果下游不再需要订阅数据,则应该在下游停止订阅后,及时清理发布端的复制槽,以释放WAL日志空间。

说明

清理复制槽将导致关联的WAL日志被永久删除,未被消费的数据变更将丢失且无法恢复。如果只是临时暂停,请勿执行此操作清理复制槽。

在主CN上执行以下SQL,即可在所有相关节点上批量删除复制槽:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;