使用DataX同步MySQL数据

本文将通过示例介绍在 Alibaba Cloud Linux 中如何使用 DataX 工具将 MySQL 数据库中的数据同步到表格存储(Tablestore)中,并简单介绍如何实现数据同步加速。

背景信息

DataX 是阿里云的离线数据同步工具,它通过 JDBC 连接 MySQL 数据库,发送 SQL 语句获取数据缓存在本地 JVM中,然后通过 Writer 线程将数据写入到表格存储的数据表中。如果想了解更多关于DataX的介绍,请参见DataX

准备工作

  • 准备需要同步的 MySQL 信息,包括用户名、密码、JDBC 连接信息等。

  • 在表格存储中创建实例和数据表,用于存放同步的数据。创建实例和数据表的具体操作,请参见创建实例和数据表

    说明

    创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。

    本文示例使用的MySQLTablestore表结构对比如下。

    image

    Snipaste_2024-11-08_11-03-53

  • 获取表格存储的 Endpoint。

    • 前往表格存储控制台

    • 概览页的实例列表中点击实例名称进入实例详情,根据实际情况选择实例访问地址

  • 获取 AccessKey 信息。请使用阿里云账号的 AccessKey 进行配置。获取AccessKey的具体操作,请参见创建AccessKey

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限(AliyunOTSFullAccess)并为该RAM用户创建AccessKey。具体操作,请参见创建RAM用户RAM用户授权创建AccessKey

操作步骤

一、安装依赖

  1. 安装Python(Python 2Python 3都可以)。

    Alibaba Cloud Linux已自动安装Python,如果您使用的是其它发行版的Linux,请自行安装并通过python --version命令验证Python版本。

  2. 安装JDK(1.8及以上,推荐1.8)。

    yum -y install java-1.8.0-openjdk-devel.x86_64

二、安装DataX

  1. 下载DataX工具包。

    wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
  2. 解压工具包。

    tar -zxvf datax.tar.gz

如果有自行编译DataX的需要,请参见DataX安装指引

三、编写配置文件

  1. 进入DataXbin目录。

    cd datax/bin
  2. 创建配置文件。如果您使用的是vim,请自行替换命令。

    vi mysql_to_ots.json

    配置文件参考如下,MySQL读取数据有两种模式,请按需选择。

    • querySQL模式:通过SQL语句查询需要导出的数据,支持联表查询。

    • table模式:通过指定表名、列名和where条件确定需要导出的数据,DataX会根据上述信息自动拼接SQL语句并抽取数据。该模式还可以通过数据分片实现并发同步。

    querySQL模式

    请根据您的数据库和表信息替换配置文件中的内容。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "your_username",
                "password": "your_password",
                "connection": [
                  {
                    "querySql": [
                      "select * from your_tablename"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://your_serverip:3306/your_dbname?useSSL=false"
                    ]
                  }
                ]
              }
            },
    
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"your_endpoint",
                "accessId":"your_accesskeyid",
                "accessKey":"your_accesskeysecret",
                "instanceName":"your_instancename",
                "table":"your_tablename",
                "primaryKey":[
                  {"name":"column1", "type":"int"}
                ],
                "column":[
                  {"name":"column1","type":"string"},
                  {"name":"column2","type":"string"},
                  {"name":"column3","type":"string"},
                  {"name":"column4","type":"bool"}
                ],
                "writeMode":"UpdateRow"
              }
            }
    
          }
        ]
      }
    }

    table模式

    请根据您的数据库和表信息替换配置文件中的内容。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 3
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0.02
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "your_username",
                "password": "your_password",
                "column": [
                  "column1",
                  "column2" ,
                  "column3" ,
                  "column4",
                  "column5"
                ],
                "splitPk": "column1",
                "connection": [
                  {
    
                    "table": [
                      "your_tablename"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://your_serverip:3306/your_dbname?useSSL=false"
                    ]
                  }
                ]
              }
            },
    
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"your_endpoint",
                "accessId":"your_accesskeyid",
                "accessKey":"your_accesskeysecret",
                "instanceName":"your_instancename",
                "table":"your_tablename",
                "primaryKey":[
                  {"name":"column1", "type":"int"}
                ],
                "column":[
                  {"name":"column1","type":"string"},
                  {"name":"column2","type":"string"},
                  {"name":"column3","type":"string"},
                  {"name":"column4","type":"bool"}
                ],
                "writeMode":"UpdateRow"
              }
            }
    
          }
        ]
      }
    }

四、执行同步命令

  1. 执行以下命令开始同步数据。

    python datax.py mysql_to_ots.json

    同步任务结束后,将打印整体运行情况。

    image

  2. 验证同步结果。

    您可以前往表格存储控制台查看已导入的数据。

同步加速

通过DataX将数据从MySQL同步到表格存储可以分为数据读取数据交换数据写入三个部分,本文将简单介绍如何在数据读取和数据交换阶段进行同步加速。

一、数据读取

table模式下,可以通过设置splitPkchannel来进行数据分片,通过并发的方式实现同步加速。

  • splitPk

    进行数据分片的字段,仅支持整型和字符串类型。推荐选择数据分布均匀的字段,比如主键、自增ID等。

  • channel

    同步通道数,决定ReaderWriter的个数上限以及数据分片的数量。

DataX会按照设置的splitPkchannel进行数据分片,如下图所示。

image

说明

如果不填写splitPksplitPk填写为空,DataX将使用单通道同步数据。

如果指定了splitPk,但channel设置为1,DataX也将使用单通道同步数据。

二、数据交换

可以通过调整JVM内存和channel的流控参数提高数据同步效率。

  • JVM内存

    DataX通过SQL查询到的数据会缓存在内存中。此外每个channel也会维护自己的record队列,channel数越多,则需要更多的内存。您可以在执行同步命令时使用-j参数来指定JVM的内存大小。例如:

    python datax.py -j "-Xms4g -Xmx4g" mysql_to_ots.json
  • channel流控参数

    DataX的配置文件conf/core.json中,channel的关键参数如下所示。

    image

    详细参数说明请参见下表。

    参数

    描述

    capacity

    限制channel中队列的大小,即最多缓存的record个数。

    byteCapacity

    限制record占用的内存大小,单位为字节。默认值为64 MB。

    byte

    控流参数,限制通道的默认传输速率,-1表示不限制。

    record

    控流参数,限制通道的传输记录个数,-1表示不限制。

    说明

    您可以调高capacitybyteCapacity来提高DataX缓存,也可以适当提高channel个数以及每个channelbyterecord来提高DataX的吞吐量。但请综合考虑channel的个数和流控参数,保证同步时不会对服务器产生过高的压力。

相关文档