本文介绍如果数据传输源端发生了 DDL 变更(例如新增表字段),您如何通过触发器来捕获 DDL 相关事件信息,以确保 DML 变更操作正常同步。
背景信息
使用数据传输执行源端 PostgreSQL 数据库增量同步时,默认仅支持 DML 语句(包括 INSERT、DELETE和 UPDATE)的同步。但如果源端发生了 DDL 变更(例如新增表字段),而数据传输无法获取新的结构信息,将导致后续和该对象相关的 DML 同步失败,造成任务中断的问题。所以,您需要先在源端 PostgreSQL 数据库创建 DDL 记录表和触发器来捕获 DDL 信息,以确保 DML 的正常同步。
使用限制
源端 PostgreSQL 数据库必须为 V10.x 及以上版本。
操作步骤
登录源端数据库后,切换至待迁移数据库。
以 PSQL 工具为例,通过执行命令
\c <数据库名称>,切换至待迁移数据库。创建下述两张表,用于记录 DDL 事件变更。
非 DROP DDL 事件信息表
----postgreSQL DDL EVENT TABLE---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;DROP DDL 事件信息表
----postgreSQL DROP DDL EVENT TABLE---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
创建下述两种事件触发器函数。
非 DROP DDL 事件触发器函数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA', 'COMMENT')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS ddl trigger error during command process: %', SQLERRM; END $BODY$;DROP DDL 事件触发器函数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; BEGIN SELECT current_query() INTO ddl_text; FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP IF record_object.object_type = 'type' THEN ELSE INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name); END IF; END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS drop ddl trigger error during command process: %', SQLERRM; END $BODY$;
将创建函数的所有者修改为数据传输连接源端数据库的账号。此处以
postgres为例:ALTER FUNCTION public.oms_capture_ddl_for_non_dropped() OWNER TO postgres; ALTER FUNCTION public.oms_capture_ddl_for_dropped() OWNER TO postgres;执行下述命令,创建全局事件触发器。
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA','COMMENT') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
后续操作
释放数据迁移任务后,您需要登录源端数据库,执行下述命令删除触发器函数和对应表。
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;该文章对您有帮助吗?