本文介绍如何将RDS MySQL数据实时迁移到RDS PostgreSQL,使RDS PostgreSQL成为RDS MySQL的实时分析库。

背景信息

当需要使用RDS PostgreSQL数据库特有的功能来分析MySQL中的数据,使用GIS处理时空数据或进行用户画像分析时,可以使用DTS功能将RDS MySQL中的数据实时迁移到RDS PostgreSQL数据库中,使PostgreSQL数据库作为MySQL的实时分析库。

前提条件

  • 已创建源RDS MySQL实例,创建方法,请参见创建RDS MySQL实例
  • 已创建目标RDS PostgreSQL实例,创建方法,请参见创建RDS PostgreSQL实例
  • 目标RDS PostgreSQL实例存储空间须大于源RDS MySQL实例占用的存储空间。

配置步骤

  1. 准备源数据库测试数据
  2. 创建迁移任务
  3. MySQL数据实时迁移至PostgreSQL

准备源数据库测试数据

  1. 连接源数据库RDS MySQL。
    mysql -h <连接地址> -u <用户名> -P <端口> -p
  2. 创建测试数据库db1。
    CREATE DATABASE db1;
  3. 进入db1数据库。
    USE db1;
  4. 创建测试数据表test_mmtest_innodb。
    CREATE TABLE `test_mm` (
    `id` INT (11) NOT NULL AUTO_INCREMENT,
    `user_id` VARCHAR (20) NOT NULL,
    `group_id` INT (11) NOT NULL,
    `create_time` datetime NOT NULL,
    PRIMARY KEY (`id`),KEY `index_user_id` (`user_id`) USING HASH
    ) ENGINE = innodb AUTO_INCREMENT = 1 
    DEFAULT CHARSET = utf8;
    CREATE TABLE `test_innodb` (
    `id` INT (11) NOT NULL AUTO_INCREMENT,
    `user_id` VARCHAR (20) NOT NULL,
    `group_id` INT (11) NOT NULL,
    `create_time` datetime NOT NULL,
    PRIMARY KEY (`id`),
    KEY `index_user_id` (`user_id`) USING HASH
    ) ENGINE = innodb AUTO_INCREMENT = 1 
    DEFAULT CHARSET = utf8;
  5. 创建随机字符串函数。
    delimiter $$
    CREATE FUNCTION rand_string(n int) RETURNS varchar(255)
    begin
    declare chars_str varchar(100)
    default "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    declare return_str varchar(255) default "";
    declare i int default 0;
    while i < n do
    set return_str=concat(return_str,substring(chars_str,floor(1+rand()*62),1));
    set i= i+1;
    end while;
    return return_str;
    end $$
    delimiter ;
  6. 创建插入测试数据的存储过程。
    delimiter $$
    CREATE PROCEDURE `insert_data`(IN n int)
    BEGIN
    DECLARE i INT DEFAULT 1;
    WHILE (i <= n ) DO
    INSERT into test_mm (user_id,group_id,create_time ) VALUEs
    (rand_string(20),FLOOR(RAND() * 100) ,now() );
    set i=i+1;
    END WHILE;
    END $$
    delimiter ;
  7. 调用存储过程。
    CALL insert_data(1000000);
    INSERT INTO test_innodb SELECT * FROM test_mm;

创建迁移任务

  1. 创建迁移任务前,需要在RDS PostgreSQL控制台创建数据库,用于从RDS MySQL接收数据。创建方法,请参见创建数据库
    说明 本示例中创建的数据库名为db2。
  2. 登录新版DTS同步任务的列表页面
    说明 您也可以登录DMS数据管理服务。在顶部菜单栏中,选择传输与加工(DTS) > 数据同步
  3. 在左侧导航栏,单击数据迁移
  4. 在页面顶部的迁移任务列表中选择目标实例所属地域。
    创建迁移任务
  5. 在页面右上角,单击创建迁移任务
  6. 配置源库及目标库信息。
    源库及目标库信息
    类别 配置 说明
    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息 数据库类型 选择RDS实例
    实例地区 选择源RDS MySQL实例所属地域。
    实例ID 择源RDS MySQL实例ID。
    数据库账号 填入源RDS MySQL实例的数据库账号。账号需要具备以下权限:
    • 库表结构迁移:SELECT权限。
    • 全量迁移:SELECT权限。
    • 增量迁移:REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEWSELECT权限。
    数据库密码 填入该数据库账号对应的密码。
    连接方式 根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密
    目标库信息 数据库类型 选择RDS实例
    实例地区 选择目标RDS PostgreSQL实例所属地域。
    RDS实例ID 选择目标RDS PostgreSQL实例ID。
    数据库账号 填入目标RDS PostgreSQL实例的数据库账号,需具备以下权限:
    • LOGIN权限。
    • 目标库的CONNECT、CREATE权限。
    • 目标SchemaCREATE权限。
    数据库密码 填入该数据库账号对应的密码。
  7. 配置完成后,需要分别单击源库和目标库数据库密码参数后的测试连接,测试成功后,再进行下一步。
  8. 单击页面右下角授权白名单并进入下一步
  9. 配置迁移类型及列表。
    迁移类型及列表
    配置 说明
    迁移类型 选择结构迁移、全量数据迁移和增量数据迁移。
    迁移对象 选择待迁移的表。本示例为test_innodbtest_mm。
    已选择对象 此处展示已选择待迁移的表。
    映射名称更改 默认为不进行库表名称批量修改,如果需要对库表名称进行批量修改,可在选择要进行库表名称批量更改后,单击页面右下角高级设置,批量修改数据库表名称。
    源库、目标库无法连接后的重试时间 默认为720分钟,无需修改。
  10. 上述配置完成后, 单击页面下方的预检查并启动
    说明
    • 在迁移作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动迁移作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。
      • 您可以根据提示修复后重新进行预检查。
      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。
  11. 当预检查页面中显示为预检查通过100%时,单击下一步
    与检查成功
  12. 购买配置确认页面,选择数据迁移实例的链路规格。
    说明 DTS为您提供了不同性能的迁移规格,迁移链路规格的不同会影响迁移速率,您可以根据业务场景进行选择,详情请参见数据迁移链路规格说明
  13. 勾选《数据传输(按量付费)服务条款》
  14. 单击购买并启动,迁移任务正式开始,您可在任务列表查看具体任务进度。

MySQL数据实时迁移至PostgreSQL

  1. 查看当前全量迁移状态。
    1. 连接RDS PostgreSQL数据库。
      psql -h <数据库连接地址> -U <用户名> -p <端口号> -d db2
      说明 由于在创建迁移任务步骤中创建了数据库db2来接收MySQL迁移的数据,因此,连接PostgreSQL数据库时,数据库名直接配置成db2。
    2. 执行\dn命令,查看是否将MySQL数据库db1映射为PostgreSQL中的Schema。
         List of schemas
        Name  |   Owner
      --------+------------
       db1    | test1
       public | pg*******
      (2 rows)
      说明 MySQL中的数据库名称,在迁移到PostgreSQL数据库后,将会映射到PostgreSQL中的Schema。
    3. 执行\dt+ db1.*命令,查看db1中的表状态。
                                  List of relations
       Schema |    Name     | Type  | Owner | Persistence | Size  | Description
      --------+-------------+-------+-------+-------------+-------+-------------
       db1    | test_innodb | table | test1 | permanent   | 65 MB |
       db1    | test_mm     | table | test1 | permanent   | 65 MB |
      (2 rows)
                                      
    4. 执行如下命令,分别查询test_innodbtest_mm数据表记录数。
      • 查询test_innodb记录数。
        SELECT COUNT(*) FROM db1.test_innodb;
          count
        ---------
         1000000
        (1 row)
      • 查询test_mm记录数。
        SELECT COUNT(*) FROM db1.test_mm;
          count
        ---------
         1000000
        (1 row) 
      说明

      由于MySQL数据库在PostgreSQL中被映射为Schema,在PostgreSQL中查询db1中数据时,需要指定Schema。

      如果您不想每次查询db1数据时都指定Schema,可以设置search_path参数。
      1. 查看search_path
        show search_path;
           search_path
        -----------------
         "$user", public
        (1 row)
      2. 设置search_path
        set search_path = db1, "$user", public;
        SET
      3. 查看修改结果。
        show search_path;
             search_path
        ----------------------
         db1, "$user", public
        (1 row)
  2. 新增数据实时迁移测试。
    1. RDS MySQL中新增数据。
      INSERT INTO test_innodb (user_id, group_id, `create_time`) VALUES ('testuser', 1, '2021-07-29 12:00:00');
      执行结果
      SELECT * FROM test_Innodb WHERE user_id = 'testuser';
      +---------+----------+----------+---------------------+
      | id      | user_id  | group_id | create_time         |
      +---------+----------+----------+---------------------+
      | 1000001 | testuser |        1 | 2021-07-29 12:00:00 |
      +---------+----------+----------+---------------------+
      1 row in set (0.03 sec)
    2. RDS PostgreSQL中查看新增的记录是否迁移。
      SELECT * FROM test_Innodb WHERE user_id = 'testuser';
         id    | user_id  | group_id |     create_time
      ---------+----------+----------+---------------------
       1000001 | testuser |        1 | 2021-07-29 12:00:00
      (1 row)
  3. 更新数据实时迁移测试。
    1. RDS MySQL中更新数据。
      UPDATE test_innodb set group_id = 2 WHERE user_id = 'testuser';
      执行结果:
      SELECT * FROM test_innodb WHERE user_id = 'testuser';
      +---------+----------+----------+---------------------+
      | id      | user_id  | group_id | create_time         |
      +---------+----------+----------+---------------------+
      | 1000001 | testuser |        2 | 2021-07-29 12:00:00 |
      +---------+----------+----------+---------------------+
      1 row in set (0.03 sec)
    2. RDS PostgreSQL中查看更新的记录是否迁移。
      SELECT * FROM test_innodb WHERE user_id = 'testuser';
         id    | user_id  | group_id |     create_time
      ---------+----------+----------+---------------------
       1000001 | testuser |        2 | 2021-07-29 12:00:00
      (1 row)
  4. 删除数据实时迁移测试。
    1. RDS MySQL中删除数据。
      DELETE FROM test_innodb WHERE user_id = 'testuser';
      执行结果:
      • 查询user_idtestuser的记录。
        SELECT * FROM test_innodb WHERE user_id = 'testuser';
        Empty set (0.03 sec)
      • 查询MAX(id)。
        SELECT MAX(id) FROM test_innodb;
        +---------+
        | MAX(id) |
        +---------+
        | 1000000 |
        +---------+
        1 row in set (0.03 sec)
      说明 未新增记录时,id最大为1000000,新增数据后,id自增到1000001,删除后,id又减少至1000000。
    2. RDS PostgreSQL中查看删除记录后数据是否迁移。
      SELECT * FROM test_innodb WHERE user_id = 'testuser';
       id | user_id | group_id | create_time
      ----+---------+----------+-------------
      (0 rows)
      SELECT MAX(id) FROM test_innodb;
         max
      ---------
       1000000
      (1 row)