Before you use Data Transmission Service (DTS) to migrate data between PostgreSQL databases, you must create a trigger and a function in the source database. These objects capture DDL information and allow DTS to migrate DDL operations during incremental data migration.
Prerequisites
-
The source database must meet the following requirements:
-
Self-managed PostgreSQL: version 9.4 or later.
-
ApsaraDB RDS for PostgreSQL: version 10 or later.
-
ApsaraDB RDS for PostgreSQL 9.4 does not support event triggers.
-
The minor engine version of ApsaraDB RDS for PostgreSQL 10, 11, and 12 must be 20201130 or later.
-
The minor engine version of ApsaraDB RDS for PostgreSQL 13 must be 20210228 or later.
-
-
-
The data migration task must be created after October 1, 2020.
Overview
During incremental migration between PostgreSQL databases, DTS synchronizes only DML operations (INSERT, DELETE, UPDATE) by default. DDL operations are not synchronized.
By creating a trigger and a function in the source database, you enable DTS to capture and synchronize DDL operations during incremental migration.
Only table-level DDL operations are supported: CREATE TABLE, DROP TABLE, and ALTER TABLE (including RENAME TABLE, ADD COLUMN, and DROP COLUMN).
Procedure
To migrate incremental data from multiple databases, repeat Steps 2 to 5 for each database.
-
Log on to the source PostgreSQL database. Connect to an ApsaraDB RDS for PostgreSQL instance or psql.
-
Switch to the database to be migrated.
NoteIn psql, run
\c <database_name>to switch databases. Example:\c dtststdata. -
Create a table to store DDL information.
CREATE TABLE public.dts_ddl_command ( ddl_text text COLLATE pg_catalog."default", id bigserial primary key, event text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", username character varying COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", object_name character varying COLLATE pg_catalog."default", client_address character varying COLLATE pg_catalog."default", client_port integer, event_time timestamp with time zone, txid_current character varying(128) COLLATE pg_catalog."default", message text COLLATE pg_catalog."default" ); -
Create a function to capture DDL information.
CREATE FUNCTION public.dts_capture_ddl() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ declare ddl_text text; declare max_rows int := 10000; declare current_rows int; declare pg_version_95 int := 90500; declare pg_version_10 int := 100000; declare current_version int; declare object_id varchar; declare alter_table varchar; declare record_object record; declare message text; declare pub RECORD; begin select current_query() into ddl_text; if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL; show server_version_num into current_version; if current_version >= pg_version_95 then for record_object in (select * from pg_event_trigger_ddl_commands()) loop if record_object.command_tag = 'CREATE TABLE' then object_id := record_object.object_identity; end if; end loop; else select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id; end if; if object_id = '' or object_id is null then message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query(); else alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL'; message := 'alter_sql=' || alter_table; execute alter_table; end if; if current_version >= pg_version_10 then for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop raise notice 'pubname=%',pub.pubname; BEGIN execute 'alter publication ' || pub.pubname || ' add table ' || object_id; EXCEPTION WHEN OTHERS THEN END; end loop; end if; end if; insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message) values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message); select count(id) into current_rows from public.dts_ddl_command; if current_rows > max_rows then delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command); end if; end $BODY$; -
Change the function owner to the account DTS uses to connect to the source database. This example uses `postgres`.
ALTER FUNCTION public.dts_capture_ddl() OWNER TO postgres; -
Create a global event trigger.
CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
What to do next
Configure an incremental data migration task based on your source database version. Migrate data from a self-managed PostgreSQL database to an ApsaraDB RDS for PostgreSQL instance.
-
For Migration Types, select only Incremental Data Migration.
-
After the data migration task is released, delete the trigger and function from the source database.
drop EVENT trigger dts_intercept_ddl; drop function public.dts_capture_ddl(); drop table public.dts_ddl_command;