使用DataX同步MySQL数据

更新时间:2025-02-12 01:41:48

本文介绍如何使用 DataX 工具将 MySQL 数据库中的数据同步到表格存储(Tablestore)。

背景信息

DataX 是阿里云的离线数据同步工具,它通过 JDBC 连接 MySQL 数据库,发送 SQL 语句获取数据缓存在本地 JVM中,然后通过 Writer 线程将数据写入到表格存储的数据表中。如果想了解更多关于DataX的介绍,请参见DataX

准备工作

  • 准备需要同步的 MySQL 信息,包括用户名、密码、JDBC 连接信息等。

  • 开通表格存储服务,创建实例和数据表,用于存放同步的数据。具体操作,请参见开通服务并创建实例创建数据表

    重要

    创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。本文使用的MySQL和表格存储数据表样例,请参见附录-样例数据

  • 获取表格存储的服务地址(Endpoint)和实例名称。

    1. 登录表格存储控制台

    2. 在页面上方,选择资源组和地域。

    3. 概览页面,单击实例别名或在操作列单击实例管理

    4. 实例详情页签,查看实例的名称和服务地址。

  • 获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限( AliyunOTSFullAccess )并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储

操作步骤

本文操作使用的服务器为云服务器ECS,操作系统为Alibaba Cloud Linux 3.2104 LTS 64位和Ubuntu 22.04 64位。

一、安装依赖

  1. 安装Python(Python 2Python 3都可以)。

    ECSAlibaba Cloud LinuxUbuntu系统已自带Python 3,如果您使用的是其他服务器,请自行进行安装。

  2. 安装JDK(1.8及以上,推荐1.8)。

    本文介绍在ECSAlibaba Cloud LinuxUbuntu系统中安装JDK 1.8的方法,如果您使用的是其他服务器,请自行进行安装。

    Alibaba Cloud Linux
    Ubuntu
    yum -y install java-1.8.0-openjdk-devel.x86_64
    apt update && apt upgrade
    apt install openjdk-8-jdk

二、安装DataX

  1. 下载DataX工具包。

    wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
  2. 解压工具包。

    tar -zxvf datax.tar.gz

如果有自行编译DataX的需要,请参见DataX安装指引

三、编写配置文件

  1. 进入DataXbin 目录。

    cd datax/bin
  2. 创建配置文件。如果您使用的是 vim,请自行替换命令。

    vi mysql_to_ots.json

    配置文件参考如下,MySQL读取数据有两种模式,请按需选择。

    • querySQL模式:通过SQL语句查询需要导出的数据,支持联表查询。

    • table模式:通过指定表名、列名和where条件确定需要导出的数据,DataX会根据上述信息自动拼接SQL语句并抽取数据。该模式还可以通过数据分片实现并发同步。

    querySQL模式
    table模式

    请根据您的同步信息和需求替换配置文件内的参数信息。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "mysql_username",
                "password": "mysql_password",
                "connection": [
                  {
                    "querySql": [
                      "select * from table_name"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://server_ip:3306/database_name?useSSL=false"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"endpoint",
                "accessId":"accesskey_id",
                "accessKey":"accesskey_secret",
                "instanceName":"instance_name",
                "table":"table_name",
                "primaryKey":[
                  {"name":"order_id", "type":"string"}
                ],
                "column":[
                  {"name":"user_id","type":"string"},
                  {"name":"sku_id","type":"string"},
                  {"name":"price","type":"double"},
                  {"name":"num","type":"int"},
                  {"name":"total_price","type":"double"},
                  {"name":"order_status","type":"string"},
                  {"name":"create_time","type":"string"},
                  {"name":"modified_time","type":"string"}
                ],
                "writeMode":"UpdateRow"
              }
            }
          }
        ]
      }
    }

    MySQL Reader需要替换的参数说明如下:

    参数名称

    说明

    username

    MySQL 连接用户名。

    password

    MySQL 连接用户密码。

    querySql

    查询SQL,用于确定需要同步的数据范围。

    jdbcUrl

    JDBC连接信息。

    关于MySQL Reader的更多信息,请参见MySQL Reader介绍

    表格存储Writer需要替换的参数说明如下:

    参数名称

    说明

    endpoint

    实例服务地址。

    accessId

    阿里云账号或RAM用户的AccessKey ID。

    accessKey

    阿里云账号或RAM用户的AccessKey Secret。

    instanceName

    实例名称。

    table

    目标表名称。

    primaryKey

    目标表的主键列列表。

    column

    需要写入的属性列。

    关于表格存储Writer的更多信息,请参见表格存储Writer介绍

    请根据您的同步信息和需求替换配置文件内的参数信息。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0.02
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "mysql_username",
                "password": "mysql_password",
                "column": [
                  "order_id",
                  "user_id" ,
                  "sku_id" ,
                  "price",
                  "num",
                  "total_price",
                  "order_status",
                  "create_time",
                  "modified_time"
                ],
                "splitPk": "num",
                "connection": [
                  {
                    "table": [
                      "table_name"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://server_ip:3306/database_name?useSSL=false"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "otswriter",
              "parameter": {
                "endpoint":"endpoint",
                "accessId":"accesskey_id",
                "accessKey":"accesskey_secret",
                "instanceName":"instance_name",
                "table":"table_name",
                "primaryKey":[
                  {"name":"order_id", "type":"string"}
                ],
                "column":[
                  {"name":"user_id","type":"string"},
                  {"name":"sku_id","type":"string"},
                  {"name":"price","type":"double"},
                  {"name":"num","type":"int"},
                  {"name":"total_price","type":"double"},
                  {"name":"order_status","type":"string"},
                  {"name":"create_time","type":"string"},
                  {"name":"modified_time","type":"string"}
                ],
                "writeMode":"UpdateRow"
              }
            }
          }
        ]
      }
    }

    MySQL Reader需要替换的参数说明如下:

    参数名称

    说明

    username

    MySQL 连接用户名。

    password

    MySQL 连接用户密码。

    column

    需要同步的源表字段。

    splitPk

    数据分片的字段,只在channel数不为1时按指定的字段进行数据分片查询。

    说明

    DataX数据分片支持整型和字符串类型的字段,仅推荐按整型字段进行数据分片。

    table

    MySQL源表表名。

    jdbcUrl

    JDBC连接信息。

    关于MySQL Reader的更多信息,请参见MySQL Reader介绍

    表格存储Writer需要替换的参数说明如下:

    参数名称

    说明

    endpoint

    实例服务地址。

    accessId

    阿里云账号或RAM用户的AccessKey ID。

    accessKey

    阿里云账号或RAM用户的AccessKey Secret。

    instanceName

    实例名称。

    table

    目标表名称。

    primaryKey

    目标表的主键列列表。

    column

    需要写入的属性列。

    关于表格存储Writer的更多信息,请参见表格存储Writer介绍

四、执行同步命令

  1. 执行以下命令开始同步数据。

    python3 datax.py mysql_to_ots.json

    同步任务结束后,将打印整体运行情况。

    2025-02-10 18:02:17.355 [job-0] INFO  JobContainer -
    任务启动时刻                    : 2025-02-10 18:02:06
    任务结束时刻                    : 2025-02-10 18:02:17
    任务总计耗时                    :                 11s
    任务平均流量                    :          871.56KB/s
    记录写入速度                    :          10000rec/s
    读出记录总数                    :              100000
    读写失败总数                    :                   0
  2. 验证同步结果。

    您可以前往表格存储控制台查看已导入的数据。

相关文档

  • 表格存储数据列支持的数据类型,请参见数据类型

附录-样例数据

MySQL源表

MySQL数据样例源表建表语句如下:

create table orders (
  order_id varchar(50) primary key comment '订单ID',
  user_id varchar(10) not null comment '用户ID',
  sku_id varchar(10) not null comment '商品ID',
  price decimal(12, 2) not null comment '商品购买单价',
  num int not null comment '商品购买数量',
  total_price decimal(12, 2) not null comment '订单总价',
  order_status varchar(2) not null comment '订单状态',
  create_time timestamp not null default current_timestamp comment '订单创建时间',
  modified_time timestamp not null default current_timestamp on update current_timestamp comment '最后修改时间'
);

表格存储目标表

由于表格存储是schema-free的,创建目标表时仅需指定主键 order_id 即可。目标表基本详情如下:

image

  • 本页导读 (1)
  • 背景信息
  • 准备工作
  • 操作步骤
  • 一、安装依赖
  • 二、安装DataX
  • 三、编写配置文件
  • 四、执行同步命令
  • 相关文档
  • 附录-样例数据
  • MySQL源表
  • 表格存储目标表