在使用DTS执行PostgreSQL数据库间的数据迁移前,可通过本文介绍的方法在源库创建触发器和函数获取源库的DDL信息,然后再由DTS执行数据迁移,在增量数据迁移阶段即可实现DDL操作的增量迁移。

前提条件

PostgreSQL 9.4及以上版本。

背景信息

通过DTS执行PostgreSQL数据库间的数据迁移时,在增量数据迁移阶段,DTS仅支持DML操作(INSERT、DELETE、UPDATE)的同步,不支持DDL操作的同步。

通过本文的方法先在源库中创建触发器和函数来捕获DDL信息,再由DTS执行数据迁移,即可实现DDL操作的同步。

说明 仅支持表级别DDL操作的同步:CREATE TABLE、DROP TABLE、ALTER TABLE(包含RENAME TABLE、ADD COLUMN、DROP COLUMN)。

操作步骤

警告 如果源库中有多个数据库需要执行增量数据迁移,您需要重复执行步骤2到步骤5。
  1. 登录源PostgreSQL数据库,相关方法请参见连接PostgreSQL实例psql工具介绍
  2. 切换至待迁移的数据库。
    说明 本案例以pgql工具为例介绍,您可以使用\c <数据库名>命令来切换数据库,例如\c dtststdata
  3. 执行下述命令在创建存放DDL信息的表。
    CREATE TABLE public.dts_ddl_command
    (
        ddl_text text COLLATE pg_catalog."default",
       id bigserial primary key,
       event text COLLATE pg_catalog."default",
       tag text COLLATE pg_catalog."default",
       username character varying COLLATE pg_catalog."default",
       database character varying COLLATE pg_catalog."default",
       schema character varying COLLATE pg_catalog."default",
       object_type character varying COLLATE pg_catalog."default",
       object_name character varying COLLATE pg_catalog."default",
       client_address character varying COLLATE pg_catalog."default",
       client_port integer,
       event_time timestamp with time zone,
       txid_current character varying(128) COLLATE pg_catalog."default",
       message text COLLATE pg_catalog."default"
    )
  4. 执行下述命令创建捕获DDL信息的函数。
    CREATE FUNCTION public.dts_capture_ddl()
       RETURNS event_trigger
       LANGUAGE 'plpgsql'
       COST 100
       VOLATILE NOT LEAKPROOF SECURITY DEFINER
    AS $BODY$
        declare ddl_text text;
        declare max_rows int := 10;
        declare current_rows int;
        declare pg_version_95 int := 90500;
        declare current_version int;
        declare object_id varchar;
        declare alter_table varchar;
        declare record_object record;
        declare message text;
    begin
        select query into ddl_text from pg_stat_activity where pid=pg_backend_pid();
        insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
        values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
        select count(id) into current_rows from public.dts_ddl_command;
        if current_rows > max_rows then
     delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command);
        end if;
    end
    $BODY$;
  5. 执行下述命令创建全局事件触发器。
    CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end
    EXECUTE PROCEDURE public.dts_capture_ddl();

后续步骤

根据源库的版本,选择下述步骤配置数据迁移任务:
说明 RDS PostgreSQL间迁移的配置方法类似。