Use triggers and functions to implement incremental DDL migration for PostgreSQL

更新时间:
复制 MD 格式

Before you use Data Transmission Service (DTS) to migrate data between PostgreSQL databases, you must create a trigger and a function in the source database. These objects capture DDL information and allow DTS to migrate DDL operations during incremental data migration.

Prerequisites

  • The source database must meet the following requirements:

    • Self-managed PostgreSQL: version 9.4 or later.

    • ApsaraDB RDS for PostgreSQL: version 10 or later.

      • ApsaraDB RDS for PostgreSQL 9.4 does not support event triggers.

      • The minor engine version of ApsaraDB RDS for PostgreSQL 10, 11, and 12 must be 20201130 or later.

      • The minor engine version of ApsaraDB RDS for PostgreSQL 13 must be 20210228 or later.

  • The data migration task must be created after October 1, 2020.

Overview

During incremental migration between PostgreSQL databases, DTS synchronizes only DML operations (INSERT, DELETE, UPDATE) by default. DDL operations are not synchronized.

By creating a trigger and a function in the source database, you enable DTS to capture and synchronize DDL operations during incremental migration.

Note

Only table-level DDL operations are supported: CREATE TABLE, DROP TABLE, and ALTER TABLE (including RENAME TABLE, ADD COLUMN, and DROP COLUMN).

Procedure

Warning

To migrate incremental data from multiple databases, repeat Steps 2 to 5 for each database.

  1. Log on to the source PostgreSQL database. Connect to an ApsaraDB RDS for PostgreSQL instance or psql.

  2. Switch to the database to be migrated.

    Note

    In psql, run \c <database_name> to switch databases. Example: \c dtststdata.

  3. Create a table to store DDL information.

    CREATE TABLE public.dts_ddl_command
    (
        ddl_text text COLLATE pg_catalog."default",
       id bigserial primary key,
       event text COLLATE pg_catalog."default",
       tag text COLLATE pg_catalog."default",
       username character varying COLLATE pg_catalog."default",
       database character varying COLLATE pg_catalog."default",
       schema character varying COLLATE pg_catalog."default",
       object_type character varying COLLATE pg_catalog."default",
       object_name character varying COLLATE pg_catalog."default",
       client_address character varying COLLATE pg_catalog."default",
       client_port integer,
       event_time timestamp with time zone,
       txid_current character varying(128) COLLATE pg_catalog."default",
       message text COLLATE pg_catalog."default"
    );
  4. Create a function to capture DDL information.

    CREATE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE 'plpgsql'
        COST 100
        VOLATILE NOT LEAKPROOF SECURITY DEFINER
    AS $BODY$
      declare ddl_text text;
      declare max_rows int := 10000;
      declare current_rows int;
      declare pg_version_95 int := 90500;
      declare pg_version_10 int := 100000;
      declare current_version int;
      declare object_id varchar;
      declare alter_table varchar;
      declare record_object record;
      declare message text;
      declare pub RECORD;
    begin
    
      select current_query() into ddl_text;
    
      if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL;
        show server_version_num into current_version;
        if current_version >= pg_version_95 then
          for record_object in (select * from pg_event_trigger_ddl_commands()) loop
            if record_object.command_tag = 'CREATE TABLE' then
              object_id := record_object.object_identity;
            end if;
          end loop;
        else
          select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
        end if;
        if object_id = '' or object_id is null then
          message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
        else
          alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL';
          message := 'alter_sql=' || alter_table;
          execute alter_table;
        end if;
        if current_version >= pg_version_10 then
          for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop
            raise notice 'pubname=%',pub.pubname;
            BEGIN
              execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
            EXCEPTION WHEN OTHERS THEN
            END;
          end loop;
        end if;
      end if;
    
      insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
      values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
    
      select count(id) into current_rows from public.dts_ddl_command;
      if current_rows > max_rows then
        delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command);
      end if;
    end
    $BODY$;
  5. Change the function owner to the account DTS uses to connect to the source database. This example uses `postgres`.

    ALTER FUNCTION public.dts_capture_ddl()
        OWNER TO postgres;
  6. Create a global event trigger.

    CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end
    EXECUTE PROCEDURE public.dts_capture_ddl();

What to do next

Configure an incremental data migration task based on your source database version. Migrate data from a self-managed PostgreSQL database to an ApsaraDB RDS for PostgreSQL instance.

Note
  • For Migration Types, select only Incremental Data Migration.

  • After the data migration task is released, delete the trigger and function from the source database.

    drop EVENT trigger dts_intercept_ddl;
    drop function public.dts_capture_ddl();
    drop table public.dts_ddl_command;