通过DTS、阿里云DataWorks、Flink CDC和Catalog均可将PostgreSQL(如自建PostgreSQL、RDS PostgreSQL和PolarDB PostgreSQL等)的数据迁移至云数据库 SelectDB 版。您可依据迁移的数据量和业务场景,选择合适的方式完成数据迁移。
迁移方式功能对比
DTS、DataWorks、Flink CDC、Catalog均可将PostgreSQL的数据迁移至SelectDB,但不同的方式支持迁移的数据有所不同,您可根据不同的业务场景,选择合适的迁移方式。
迁移方式 | 历史数据迁移 | 增量数据同步 | 表结构迁移 | 整库迁移 | 增量同步DDL | 数据校验 |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
前提条件
确保PostgreSQL实例和SelectDB实例的网络处于互通状态。
PostgreSQL实例和SelectDB实例处于同一VPC下。如果不在同一VPC下,请先解决网络互通问题。如何操作,请参见如何解决SelectDB实例与数据源网络互通问题?
已将PostgreSQL实例IP添加至SelectDB的白名单。具体操作,请参见设置白名单。
若PostgreSQL实例存在白名单机制,已将SelectDB实例所在网段IP添加至PostgreSQL实例的白名单中。
获取SelectDB实例VPC地址的IP,请参见如何查看云数据库 SelectDB 版实例所属VPC的IP网段?
获取SelectDB实例公网的IP地址,通过
ping
命令访问SelectDB实例的公网地址,获取其对应的 IP 地址。
操作步骤
通过DTS迁移
DTS支持迁移PostgreSQL历史数据、同步增量数据到SelectDB,且支持库表迁移、DDL同步、数据校验等能力。
通过DataWorks迁移
步骤一:新增数据源
在进行数据同步任务开发时,您需要在DataWorks上分别创建PostgreSQL和SelectDB数据源。
创建SelectDB数据源,详情请参见创建并管理数据源。SelectDB数据源的部分配置参数如下所示:
参数
说明
数据源名称
数据源的名称。
MySQL连接地址
JDBC连接串
jdbc:mysql://<ip>:<port>/<dbname>
。您可在SelectDB控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。
示例:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db
HTTP连接地址
HTTP协议访问地址
<ip>:<port>
。您在SelectDB控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。
示例:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080
用户名
SelectDB实例的用户名。
密码
SelectDB实例对应用户的密码。
步骤二:配置数据同步任务
请参见如下文档,配置数据迁移任务:
通过Flink CDC迁移
Flink提供Flink CDC方式从PostgreSQL迁移数据到SelectDB。Flink CDC支持历史数据迁移、增量数据同步,且具备完备的库表迁移、DDL同步等能力。
准备环境
搭建Flink环境,本示例以Flink 1.16单机环境为例。
下载flink-1.16.3-bin-scala_2.12.tgz,进行解压,示例如下。如果此版本已过期,请可以下载其他版本。更多版本,请参见Apache Flink。
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
进入FLINK_HOME/lib目录中下载flink-sql-connector-mysql-cdc-2.4.2和flink-doris-connector-1.16-1.5.2,示例如下。
说明整库同步支持Flink 1.15以上的版本,各个版本Flink Doris Connector的下载请参见Flink Doris Connector。
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
启动Flink Standalone集群,示例如下。
bin/start-cluster.sh
创建SelectDB实例。具体操作,请参见创建实例。
通过MySQL协议连接SelectDB实例。具体操作,请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
提交Flink CDC任务
提交Flink CDC任务的语法如下:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
参数说明
参数 | 是否必填 | 说明 |
execution.checkpointing.interval | 是 | Flink checkpoint的时间间隔,影响数据同步的频率,推荐10s。 |
parallelism.default | 否 | 设置Flink任务的并行度,适当增加并行度可提高数据同步速度。 |
database | 是 | 同步到SelectDB的数据库名。 |
including-tables | 否 | 需要同步的PostgreSQL表。 可以使用"|"分隔多个表,并支持正则表达式。 例如 |
excluding-tables | 否 | 不需要同步的表,配置方法与including-tables相同。 |
postgres-conf | 是 | Postgres CDC Source配置。配置详情请参见Postgres CDC Connector,其中 |
sink-conf | 是 | Doris Sink的所有配置。更多详情,请参见通过Flink导入数据。 |
table-conf | 否 | SelectDB表的配置项,即创建SelectDB表时properties中的内容。 |
通过Catalog迁移
SelectDB提供的Catalog能力,支持通过联邦查询方式访问PostgreSQL,可简单快速完成PostgreSQL历史数据迁移到SelectDB。
连接SelectDB实例。具体操作,请参见连接实例。
说明使用DMS登录时,
SWITCH
指令失效。推荐使用MySQL客户端连接。创建PostgreSQL JDBC Catalog。
CREATE CATALOG jdbc_postgresql PROPERTIES ( "type"="jdbc", "user"="root", "password"="123456", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver", "checksum" = "20c8228267b6c9ce620fddb39467d3eb" );
参数说明
参数
是否必选
说明
user
是
对应数据库的账号。
password
是
对应数据库的密码。
jdbc_url
是
JDBC连接串。
driver_url
是
JDBC Driver Jar包名称。
driver_class
是
JDBC Driver Class名称。
lower_case_table_names
否
指定是否以小写的形式同步JDBC外部数据源的库名和表名。
默认值:
"false"
only_specified_database
否
指定是否只同步指定的Database。
默认值:
"false"
include_database_list
否
当
only_specified_database=true
时,指定同步多个Database,以英文逗号(,)分隔。DB名称大小写敏感。默认值:
""
exclude_database_list
否
当
only_specified_database=true
时,指定不需要同步的多个Database,以英文逗号(,)分隔。DB名称大小写敏感。默认值:
""
更多详情,请参见JDBC数据源。
在SelectDB中进行建表后,然后通过库内ETL语法
insert into select
完成数据同步。更多insert into
详情,请参见Insert Into。# 建表 CREATE TABLE selectdb_table ... # 迁移数据 INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;