迁移PostgreSQL数据

通过DTS、阿里云DataWorks、Flink CDCCatalog均可将PostgreSQL(如自建PostgreSQL、RDS PostgreSQLPolarDB PostgreSQL等)的数据迁移至云数据库 SelectDB 版。您可依据迁移的数据量和业务场景,选择合适的方式完成数据迁移。

迁移方式功能对比

DTS、DataWorks、Flink CDC、Catalog均可将PostgreSQL的数据迁移至SelectDB,但不同的方式支持迁移的数据有所不同,您可根据不同的业务场景,选择合适的迁移方式。

迁移方式

历史数据迁移

增量数据同步

表结构迁移

整库迁移

增量同步DDL

数据校验

DTS

✔️

✔️

✔️

✔️

✔️

✔️

DataWorks

✔️

✔️

✔️

✔️

✔️

Flink CDC

✔️

✔️

✔️

✔️

✔️

Catalog

✔️

前提条件

  • 确保PostgreSQL实例和SelectDB实例的网络处于互通状态。

操作步骤

通过DTS迁移

DTS支持迁移PostgreSQL历史数据、同步增量数据到SelectDB,且支持库表迁移、DDL同步、数据校验等能力。

通过DataWorks迁移

步骤一:新增数据源

在进行数据同步任务开发时,您需要在DataWorks上分别创建PostgreSQLSelectDB数据源。

  1. 创建PostgreSQL数据源

  2. 创建SelectDB数据源,详情请参见创建并管理数据源SelectDB数据源的部分配置参数如下所示:

    参数

    说明

    数据源名称

    数据源的名称。

    MySQL连接地址

    JDBC连接串jdbc:mysql://<ip>:<port>/<dbname>

    您可在SelectDB控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

    示例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db

    HTTP连接地址

    HTTP协议访问地址<ip>:<port>

    您在SelectDB控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

    示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    用户名

    SelectDB实例的用户名。

    密码

    SelectDB实例对应用户的密码。

步骤二:配置数据同步任务

请参见如下文档,配置数据迁移任务:

通过Flink CDC迁移

Flink提供Flink CDC方式从PostgreSQL迁移数据到SelectDB。Flink CDC支持历史数据迁移、增量数据同步,且具备完备的库表迁移、DDL同步等能力。

准备环境

搭建Flink环境,本示例以Flink 1.16单机环境为例。

  1. 下载flink-1.16.3-bin-scala_2.12.tgz,进行解压,示例如下。如果此版本已过期,请可以下载其他版本。更多版本,请参见Apache Flink

    wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. 进入FLINK_HOME/lib目录中下载flink-sql-connector-mysql-cdc-2.4.2flink-doris-connector-1.16-1.5.2,示例如下。

    说明

    整库同步支持Flink 1.15以上的版本,各个版本Flink Doris Connector的下载请参见Flink Doris Connector

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. 启动Flink Standalone集群,示例如下。

    bin/start-cluster.sh
  4. 创建SelectDB实例。具体操作,请参见创建实例

  5. 通过MySQL协议连接SelectDB实例。具体操作,请参见连接实例

  6. 创建测试数据库和测试表。

    1. 创建测试数据库。

      CREATE DATABASE test_db;
    2. 创建测试表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

提交Flink CDC任务

提交Flink CDC任务的语法如下:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

参数说明

参数

是否必填

说明

execution.checkpointing.interval

Flink checkpoint的时间间隔,影响数据同步的频率,推荐10s。

parallelism.default

设置Flink任务的并行度,适当增加并行度可提高数据同步速度。

database

同步到SelectDB的数据库名。

including-tables

需要同步的PostgreSQL表。

可以使用"|"分隔多个表,并支持正则表达式。

例如--including-tables table1|tbl.*,指同步table1和所有以tbl开头的表。

excluding-tables

不需要同步的表,配置方法与including-tables相同。

postgres-conf

Postgres CDC Source配置。配置详情请参见Postgres CDC Connector,其中hostnameusernamepassworddatabase-nametable-nameschema-name是必选项。

sink-conf

Doris Sink的所有配置。更多详情,请参见通过Flink导入数据

table-conf

SelectDB表的配置项,即创建SelectDB表时properties中的内容。

通过Catalog迁移

SelectDB提供的Catalog能力,支持通过联邦查询方式访问PostgreSQL,可简单快速完成PostgreSQL历史数据迁移到SelectDB。

  1. 连接SelectDB实例。具体操作,请参见连接实例

    说明

    使用DMS登录时,SWITCH指令失效。推荐使用MySQL客户端连接。

  2. 创建PostgreSQL JDBC Catalog。

    CREATE CATALOG jdbc_postgresql PROPERTIES (
        "type"="jdbc",
        "user"="root",
        "password"="123456",
        "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo",
        "driver_url" = "postgresql-42.5.1.jar",
        "driver_class" = "org.postgresql.Driver",
        "checksum" = "20c8228267b6c9ce620fddb39467d3eb"
    );

    参数说明

    参数

    是否必选

    说明

    user

    对应数据库的账号。

    password

    对应数据库的密码。

    jdbc_url

    JDBC连接串。

    driver_url

    JDBC Driver Jar包名称。

    driver_class

    JDBC Driver Class名称。

    lower_case_table_names

    指定是否以小写的形式同步JDBC外部数据源的库名和表名。

    默认值:"false"

    only_specified_database

    指定是否只同步指定的Database。

    默认值:"false"

    include_database_list

    only_specified_database=true时,指定同步多个Database,以英文逗号(,)分隔。DB名称大小写敏感。

    默认值:""

    exclude_database_list

    only_specified_database=true时,指定不需要同步的多个Database,以英文逗号(,)分隔。DB名称大小写敏感。

    默认值:""

    更多详情,请参见JDBC数据源

  3. SelectDB中进行建表后,然后通过库内ETL语法insert into select完成数据同步。更多insert into详情,请参见Insert Into

    # 建表
    CREATE TABLE selectdb_table ...
    
    # 迁移数据
    INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;