文档

基于 DataX 完成数据访问代理数据迁移

更新时间:

数据访问代理(Open Database Proxy,简称 ODP)通过集成 DataX,支持全量离线静态的数据迁移功能。DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现各种异构数据源之间高效的数据同步。目前,支持的源端数据源类型依赖于 DataX 支持的类型,而目标端仅支持 MySQL 和 OceanBase。

本文将引导您快速完成以下迁移任务:

单库单表 MySQL 迁移至 ODP

前置条件

您已经开通数据访问代理产品并购买了数据访问代理实例,详见 创建实例

建库建表

  1. 登录数据访问代理控制台页面,选择 数据库,单击 创建数据库

  2. 在弹出的 创建数据库 窗口中,选择实例,单击 创建

  3. 进入 选择数据节点 页面后,根据需要选择 MySQL 或 OceanBase 的节点,单击 下一步

  4. 根据提示,填写或选择数据库的基本信息。详细参数信息,请参见 创建数据访问代理数据库

  5. 数据库创建完成后,进入数据库详情页,单击右侧页面下方的 新增数据表

  6. 新增数据表 页面,输入 数据表名,如 test_migration_user

  7. 根据需要选择表类型(单表或拆分表),此处以 拆分表 为例,并设置 分表总数 为 2。

  8. 配置分表规则,详细配置方法可参见 创建数据访问代理数据表自定义分表规则

  9. 勾选 现在创建物理表,单击 下一步

  10. DDL 语句 中,输入建表语句,示例如下:

    CREATE TABLE `test_migration_user`(
         `user_id` varchar(16) NOT NULL,
         `username` varchar(64) NOT NULL,
         `id` bigint(20) NOT NULL AUTO_INCREMENT,
         PRIMARY KEY (`id`),
         UNIQUE KEY `uk_user_id`(`user_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4;
  11. 单击 执行,完成建表。

创建迁移账户

  1. 在数据访问控制台页面,左侧导航栏选择 实例

  2. 在实例列表中,找到刚刚创建的数据库所在的实例,单击实例名称,进入实例详情页。

  3. 切换至 账号管理 页签,单击 创建账号

  4. 在弹出的新窗口中,输入数据库账号(必须是 odp_migrator)和账号密码,选择相应的授权数据库。

  5. 单击 确定,完成创建迁移账户。

配置连接参数

  1. 进入数据库详情页,选择页面下方的 连接参数 页签。

  2. 切换至 其他参数 后,在 connectionProperties 属性栏,输入以下属性值:

    rewriteBatchedStatements=true
    clobberStreamingResults=true
  3. 单击底部 保存配置 按钮。

准备 DataX 环境

DataX 是一个异构数据源间数据迁移的中间件,支持各种类型的 reader 和 writer。要实现 ODP 数据迁移,您需要准备 DataX 环境。

  1. 单击此处 下载 DataX,并将其解压至本地目录。

  2. 单击此处 下载 odpwriter 插件,并将其解压至 $datax_dir/plugin/writer 目录下。

有关 DataX 的更多信息,参见 DataX 官方文档

编辑并执行迁移 Job

  1. 进入 $datax_dir/bin 目录,编写一份迁移 Job 描述文件。文件示例如下:

    {
     "job": {
         "setting": {
             "speed": {
                     "channel": 3
             },
             "errorLimit": {
                 "record": 0,
                 "percentage": 0.02
             }
         },
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "username": "$mysql_usr",
                         "password": "$mysql_pwd",
                         "column": [
                             "user_id",
                             "username"
                         ],
                         "splitPk": "id",
                         "connection": [
                             {
                                 "table": [
                                     "user"
                                 ],
                                 "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/test_migration"
                                 ]
                             }
                         ]
                     }
                 },
                 "writer": {
                     "name": "odpwriter",
                     "parameter": {
                         "writeMode": "replace",
                         "username": "odp_migrator",
                         "password": "$odp_pwd",
                         "column": [
                             "user_id",
                             "username"
                         ],
                         "connection": [
                             {
                                 "jdbcUrl": "jdbc:mysql://$dbp_url/test_migration?useUnicode=true&characterEncoding=utf8",
                                 "table": [
                                     "test_migration_user"
                                 ]
                             }
                         ]
                     }
                 }
             }
         ]
     }
    }

    根据以上 Job 描述文件,该数据迁移任务是将单库单表的 MySQL(127.0.0.1:3306)中的 test_migration 库里的 user 表的数据(列 user_idusername)迁移至 $dbp_url 中的 test_migration 库里的 test_migration_user 表(列 user_idusername)。

  2. $datax_dir/bin 目录下,执行以下命令,运行数据迁移 Job:

    python datax.py $yourjob.json
  3. 命令执行成功,即数据迁移完成。

ODP 迁移至其他数据库

从 ODP 迁移至其他数据库,操作步骤与 从 MySQL 迁移至 ODP 相似。

  1. 参考上文,完成 建库建表创建迁移账户配置连接参数操作。

  2. 按照以下步骤,完成 DataX 环境准备、Job 描述文件的编写与执行。

准备 DataX 环境

  1. 单击此处 下载 DataX,并将其解压至本地目录。

  2. 单击此处 下载 odpreader 插件,并将其解压至 $datax_dir/plugin/reader/ 目录下。

执行 Job 描述文件

  1. 进入 $datax_dir/bin 目录,编写一份迁移 Job 描述文件。文件示例如下:

    说明

    • Reader 迁移账号也必须是 odp_migrator。

    • 对 Job 描述文件添加一个 connection 的校验,仅允许配置一个 connection,且仅允许配置一张表。

    {
     "job": {
         "setting": {
             "speed": {
                 "channel": 4
                 },
             "errorLimit": {
                 "record": 0,
                 "percentage": 0.02
                 }
         },
         "content": [
             {
                 "reader": {
                     "name": "odpreader",
                     "parameter": {
                         "username": "odp_migrator",
                         "password": "$odp_pwd",
                         "column": [
                             "user_id",
                             "gmt_create",
                             "gmt_modified",
                             "current_date",
                             "current_time",
                             "current_timestamp",
                             "name",
                             "is_ok",
                             "age",
                             "salary",
                             "height",
                             "memo",
                             "character_stream",
                             "binary_stream",
                         ],
                         "connection": [
                             {
                                 "table": [
                                     "test_migration_user_large"
                                 ],
                                 "jdbcUrl": [
                                     "jdbc:mysql://$odp_url:8066/test_join?useUnicode=true&characterEncoding=utf8"
                                 ]
                             }
                         ]
                     }
                 },
                 "writer": {
                     "name": "odpwriter",
                     "parameter": {
                         "writeMode": "replace",
                         "username": "odp_migrator",
                         "password": "$odp_pwd",
                         "column": [
                             "`user_id`",
                             "`gmt_create`",
                             "`gmt_modified`",
                             "`current_date`",
                             "`current_time`",
                             "`current_timestamp`",
                             "`name`",
                             "`is_ok`",
                             "`age`",
                             "`salary`",
                             "`height`",
                             "`memo`",
                             "`character_stream`",
                             "`binary_stream`",
                         ],
                         "batchSize": 2048,
                         "connection": [
                             {
                                 "jdbcUrl": "jdbc:mysql://$odp_url:8066/test_join?useUnicode=true&characterEncoding=utf8",
                                 "table": [
                                     "test_migration_user_large_target"
                                 ]
                             }
                         ]
                     }
                 }
             }
         ]
     }
    }
  2. $datax_dir/bin 目录下,执行如下命令,运行数据迁移 Job。

    python datax.py $yourjob.json
  3. 命令执行成功,即数据迁移完成。

注意事项

不推荐使用 SCAN_ALL hint + mysql reader 的方式进行数据读取。虽然这种方式也可以对分库分表的数据表进行数据抽取。但其数据拆分方式为 splitPK,即全表扫描之后,根据 ID 进行拆分,拆分之后的每个子任务 $from ~ $to 同样需要全表扫描才能取出。这就意味着,任务拆分之后,每个子任务的查询都是一次全表扫描。因此,这种方式性能差、容易超时,而且会对数据访问代理实例产生较大的负载压力,不建议使用。

相比之下,odpreader 的拆分方式是基于逻辑表的分表来实现的,如百库百表下会拆分为 100 个子任务,每个子任务的数据抽取仅会查询单个分表,并不会产生全表扫描。因此,建议您使用 odpreader。

常见问题

DataX 支持的数据类型

DataX 的 MysqlReader 针对 MySQL 类型转换列表如下:

DataX 内部类型

MySQL 数据类型

long

int, tinyint, smallint, mediumint, int, bigint

Double

float, double, decimal

String

varchar, char, tinytext, text, mediumtext, longtext, year

Date

date, datetime, timestamp, time

Boolean

bit, bool

Bytes

tinyblob, mediumblob, blob, longblob, varbinary

说明

  • 除上述字段类型外,其他类型均不支持。

  • 对于 tinyint(1),DataX 视为整型。

  • 对于 year,DataX 视为字符串类型。

  • 对于 bit,DataX 视为未定义行为。

事务与出错重跑

在数据迁移过程中,可能存在部分数据写入失败的情况,此时需要进行重试。由于此时可能已经有部分数据已经完成迁移,重试的时候回出现 Duplicate Key 问题。此时,可以使用 DataX 的 REPLACE/INSERT IGNORE writeMode。

需要注意,REPLACE 语句的行为是如果发现主键或唯一键冲突,会将原来这条数据删除,然后重新插入,这样既可实现比较简单的出错重跑机制。REPLACE 语句可能会导致新的数据记录的自增 id 有变化,如果使用自增 id 进行关联,会出现问题。而 INSERT IGNORE 语句则不会有这个问题,如果发现主键或唯一键冲突,会忽略当前这条 SQL 的插入。

超时问题

SocketTimeout 错误

现象

出现 SocketTimeout 报错,类似的错误信息如下所示:

Thelast packet successfully received from the server was 5,004 milliseconds ago.Thelast packet sent successfully to the server was 5,004 milliseconds ago.

解决方案

  1. 登录数据访问代理控制台,进行目标数据库的详情页。

  2. 选择页面下方的 连接参数 页签,在 其他参数 > connectionProperties 属性栏,添加如下属性:

    socketTimeout=5000(调整为更大的数值,默认为5秒,单位为毫秒)
  3. 单击 保存配置

Timeout 错误

现象

出现 Timeout 报错,对应的错误码为 4012。

原因

该错误是 OceanBase 查询超时问题。

解决方案

您可以通过以下方法处理该错误:

  • 在任意一个 column 中加入 hint:/*+空格QUERY_TIMEOUT(111111111)空格*/ $columnName,示例如下:

    "column":[
    "/*+ QUERY_TIMEOUT(100) */ user_id",
    "username",
    "id",
    "gmt_create",
    ],
    说明

    该超时时间仅影响当前SQL,并不会影响其他SQL。

  • 在 newConnectionSql 中添加 ob_query_timeout 参数,单位为微秒。

    set@@ob_query_timeout=比较大的值
    说明

    该参数配置将影响整个逻辑库。在离线与在线业务混用 ODP 实例的场景中,不推荐使用这种参数配置。可以在直连物理库时执行这条 SQL,检查该超时参数设置为多大值时不会超时,然后在此基础上再加一些 buffer。

Communications link failure 错误

现象

出现 Communications link failure 报错,错误信息如下所示:

Communications link failure

Thelast packet successfully received from the server was 0 milliseconds ago.Thelast packet sent successfully to the server was 605,920 milliseconds ago.

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:Communications link failure

Causedby: java.io.EOFException:Cannot read response from server.Expected to read 425 bytes, read 156 bytes before connection was unexpectedly lost.

原因

该错误是因为数据库达到了 net_write_timeout 而主动断连导致的,即数据库回写超时。

解决方案

数据迁移(流式读取)时默认值为 600 秒,您可以通过设置以下 ODP 连接参数,解决该问题。

netTimeoutForStreamingResults=大于600的一个数值,单位是秒