变更数据捕获(Change Data Capture, CDC)功能允许您实时捕获数据库中的数据修改(INSERT/UPDATE/DELETE),并将其以事件流的形式同步到下游系统,如数据仓库、分析平台(如Flink)或其他数据库实例。
在PolarDB PostgreSQL分布式版集群中,搭建CDC的关键在于理解数据的来源:
准备工作:检查与配置
在开始之前,请确保数据库集群的相关参数已正确设置。这些参数是开启逻辑复制功能的前提。
默认情况下,PolarDB PostgreSQL分布式版已配置好以下参数。如果检查结果不符,请提交工单联系我们处理。
在主CN节点执行以下 SQL,检查所有节点的
polar_cluster.enable_change_data_capture是否为on。SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);预期结果如下:所有节点的
result列都应为on。success | result ---------+-------- t | on t | on t | on t | on在主CN节点上执行以下SQL,检查所有节点的
wal_level是否为logical。SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);预期结果如下:所有节点的
result列都应为logical。success | result ---------+--------- t | logical t | logical t | logical t | logical
步骤一:在发布端创建发布和复制槽
发布端(即您的PolarDB PostgreSQL分布式版集群)的配置分为两步:创建发布(Publication)和创建复制槽(Replication Slot)。
创建发布
发布是定义哪些表的变更需要被捕获的集合。此操作只需在主CN上执行一次,系统会自动将其同步到所有节点。
CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;请勿使用
FOR ALL TABLES选项创建发布,应明确指定需要发布的表。<publication_name>为发布名,<table_name1>/<table_name2>为要发布的分布表/复制表。发布端仅需发布分布表的逻辑表名,无需发布物理分片的表名。
创建复制槽
复制槽用于为下游订阅者保留WAL日志,防止其在被消费前被清理。由于数据变更来源于主CN和所有DN,您需要在这些节点上都创建复制槽。
在主CN节点上执行以下SQL,即可在所有相关节点(主CN和所有DN)上批量创建同名的复制槽:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;<publication_slot_name>为复制槽名称。
预期的结果如下,其中success列为t表示复制槽创建成功。否则复制槽创建失败,通过result列可以查看具体原因:
node_name | node_port | success | result
----------------+-----------+---------+------------------------------------
10.xxx.xxx.xxx | 3007 | t | (<publication_slot_name>,0/C024D7D0)
10.xxx.xxx.xxx | 3006 | t | (<publication_slot_name>,0/C33B6668)
10.xxx.xxx.xxx | 3003 | t | (<publication_slot_name>,0/C33949B0)
(3 rows)步骤二:在订阅端创建订阅
订阅端(如Debezium、Flink或另一个PostgreSQL实例)需要为每一个创建了复制槽的节点(主CN和所有DN)分别建立一个订阅连接,以接收完整的变更数据。
每个订阅连接的配置基本相同,只需修改主机地址和端口号。
Debezium示例
如果使用Debezium作为订阅端,那么每个订阅端仅database.hostname和database.port两个配置项需要修改为相应节点的主地址和端口号,其余配置完全相同。以订阅其中一个DN节点的配置为例:
{
"name": "xxx",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "<DN1主地址>",
"database.port" : "<DN1端口>",
"database.user" : "<您的用户名>",
"database.password" : "<您的密码>",
"database.dbname" : "postgres",
"slot.name": "<复制槽名称>",
"publication.name": "<发布名称>",
...
}
}PostgreSQL示例
如果使用PostgreSQL作为订阅端,那么每个订阅端仅host和port两个配置项需要修改为相应节点的主地址和端口号,其余配置完全相同。以订阅其中一个DN节点的配置为例:
CREATE SUBSCRIPTION test_subscription
CONNECTION 'dbname=postgres host=<DN1主地址> port=<DN1端口> user=<您的用户名> password=<您的密码>'
PUBLICATION <发布名称>
WITH (create_slot=false, slot_name='<复制槽名称>');步骤三:验证CDC链路
在所有订阅端配置完成后,您可以在主CN上执行以下SQL,检查所有发布节点的复制槽是否都已激活。
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
预期得到的结果如下。如果result列值为t则表示相应节点与订阅端的复制关系已经建立。此时,您可以在源端表中插入或修改数据,并观察订阅端是否能接收到变更。
node_name | node_port | success | result
----------------+-----------+---------+--------
10.xxx.xxx.xxx | 3007 | t | t
10.xxx.xxx.xxx | 3006 | t | t
10.xxx.xxx.xxx | 3003 | t | t
(3 rows)维护与清理
如果下游不再需要订阅数据,则应该在下游停止订阅后,及时清理发布端的复制槽,以释放WAL日志空间。
清理复制槽将导致关联的WAL日志被永久删除,未被消费的数据变更将丢失且无法恢复。如果只是临时暂停,请勿执行此操作清理复制槽。
在主CN上执行以下SQL,即可在所有相关节点上批量删除复制槽:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;