通过DataX,您可以将MySQL数据库中的全量数据同步到表格存储(Tablestore)的数据表中。DataX只支持同步全量数据,不支持同步增量数据。

前提条件

  • 已创建表格存储实例并在实例详情页面获取实例的服务地址(Endpoint)。具体操作,请参见创建实例
  • 已创建表格存储数据表,用于存放迁移数据。具体操作,请参见创建数据表
    说明 创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。
  • 已获取AccessKey(包括AccessKey ID和AccessKey Secret),用于进行签名认证。具体操作,请参见获取AccessKey

背景信息

DataX通过MySQL驱动使用Reader中的MySQL连接串配置,直接发送SQL语句获取到查询数据,这些数据会缓存在本地JVM中,然后Writer线程将这些数据写入到表格存储的表中。更多信息,请参见DataX

步骤一:下载DataX

您可以选择下载DataX的源代码进行本地编译或者直接下载编译好的压缩包。

  • 下载DataX的源代码并编译。
    1. 通过Git工具执行以下命令下载DataX源代码。
      git clone https://github.com/alibaba/DataX.git
    2. 进入到下载的源代码目录后,执行以下命令进行Maven打包。
      说明 此步骤会在本地编译各种数据源的Writer和Reader,会花费较长的时间,需要耐心等待。
      mvn -U clean package assembly:assembly -Dmaven.test.skip=true
      编译完成后,进入/target/datax/datax目录,查看相应的目录。各目录说明请参见下表。
      目录 说明
      bin 存放可执行的datax.py文件,是DataX工具的入口。
      plugin 存放支持各种类型数据源的Reader和Writer。
      conf 存放core.json文件,文件中定义了一些默认参数值,例如channel流控、buffer大小等参数,建议使用默认值。
  • 下载编译好的压缩包DataX压缩包

步骤二:准备全量导出的JSON文件

在DataX中mysqlreader配置有querySQL模式和table模式两种模式,请根据实际选择。

  • querySQL模式(单task)

    一般用于有条件的数据导出。在此模式下,DataX不会按照指定的column、table参数进行SQL的拼接,而是会略过这些配置(如果有)直接执行querySQL语句。task数量固定为1,因此在此模式下channel的配置无多线程效果。

    querySQL模式的数据导出示例如下:
    {
        "job": {
    
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", #指定使用mysqlreader读取数据。
                        "parameter": {
                            "username": "username",#MySQL用户名。
                            "password": "password",#MySQL密码。
                            "connection": [
                                {
                                    "querySql": [ #指定执行的SQL语句。
                                        "select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "
                                    ],
                                    "jdbcUrl": ["jdbc:mysql://192.168.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc连接串
                                    ]
                                }
                            ]
                        }
                    },
    
                    "writer": {
                        "name": "otswriter",#指定使用otswriter进行数据写入。
                        "parameter": {#数据源配置。
                            "endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#表格存储实例的服务地址(Endpoint)。
                            "accessId":"xxxx",
                            "accessKey":"xxxx",
                            "instanceName":"smoke-test",#实例名。
                            "table":"vip_quota",#写入数据的目标table名称。
                            #以下为otswriter的限制项配置,默认可以不填,如果数据不符合以下规则,则数据会被当成脏数据过滤掉。
                            "requestTotalSizeLimitation": 1048576, #单行数据大小限制,默认配置为1 MB,可不配置。
                            "attributeColumnSizeLimitation": 2097152, #单个属性列大小限制,默认配置为2 MB,可不配置。
                            "primaryKeyColumnSizeLimitation": 1024, #单个主键列大小限制,默认配置为1 KB,可不配置。
                            "attributeColumnMaxCount": 1024, #属性列个数限制,默认配置为1024,可不配置。
    
                            "primaryKey":[# 主键名称和类型。
                                {"name":"bucket_name", "type":"string"},
                                {"name":"delta", "type":"int"},
                                {"name":"timestamp", "type":"int"}
                            ],
                            "column":[#其它column的名称和类型。
                                {"name":"cdn_in","type":"int"},
                                {"name":"cdn_out","type":"int"},
                                {"name":"total_request","type":"int"}
                            ],
                            "writeMode":"UpdateRow" #写入模式。
                        }
                    }
    
                }
            ]
        }
    }
  • table模式(多task)
    在此模式下无需手动编写select语句,而是由DataX根据JSON中的column、table、splitPk配置项自行拼接SQL语句。观察执行日志如下:fig_mysqltablemodel
    table模式的数据导出示例代码如下:
    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 3  #指定channel个数,该参数与并发数密切相关。
                },
                "errorLimit": {#容错限制。
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",#指定使用mysqlreader读取。
                        "parameter": {
                            "username": "username",#MySQL用户名。
                            "password": "password",#MySQL密码。
                            "column": [  #table模式下可以指定需要查询的列。
                                "bucket_name",      
                                "timestamp" ,
                                "delta" , 
                                "cdn_in", 
                                "cdn_out" ,
                                "total_request"
                            ],
                            "splitPk": "timestamp",#指定split字段。
                            "connection": [
                                {
    
                                    "table": [#导出的表名。
                                        "vip_quota"
                                     ],
                                    "jdbcUrl": ["jdbc:mysql://192.168.1.7:3306/db1"#jdbc连接串。
                                    ]
                                }
                            ]
                        }
                    },
    
                    "writer": {
                        "name": "otswriter",#指定使用otswriter进行写入。
                        "parameter": {#数据源配置。
                            "endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#表格存储实例的Endpoint。
                            "accessId":"xxx",
                            "accessKey":"xxx",
                            "instanceName":"smoke-test",#实例名。
                            "table":"vip_quota",#写入目标的table名称。
                            "primaryKey":[#主键名称和类型。
                                {"name":"bucket_name", "type":"string"},
                                {"name":"delta", "type":"int"},
                                {"name":"timestamp", "type":"int"}
                            ],
                            "column":[#其它column的名称和类型。
                                {"name":"haha","type":"int"},
                                {"name":"hahah","type":"int"},
                                {"name":"kengdie","type":"int"}
                            ],
                            "writeMode":"UpdateRow"#写入模式。
                        }
                    }
    
                }
            ]
        }
    }
    上述JSON文件中定义了一次数据导出导入的数据源信息和部分系统配置。配置主要包括如下两部分:
    • setting:主要是speed(与速率、并发相关)和errorLimit(容错限制)。
      • channel:个数决定了reader和writer的个数上限。
      • splitPk:指定了splitPk字段,DataX会将MySQL表中数据按照splitPk切分成n段。splitPk的字段必须是整型或者字符串类型。

        由于DataX的实现方式是按照splitPk字段分段查询数据库表,那么splitPk字段的选取应该尽可能选择分布均匀且有索引的字段,例如主键ID、唯一键等字段。如果不指定splitPk字段,则DataX将不会进行数据的切分,并行度会变为1。

        说明 为了保证同步数据的一致性,要么不配置splitPk字段使用单线程迁移数据,要么确保数据迁移期间停止该MySQL数据库的服务。
    • content:主要是数据源信息,包含reader和writer两部分。
    说明 配置中的MySQL应该确保执行DataX任务的机器能够正常访问。

步骤三:执行同步命令

执行以下命令同步数据。

python datax.py  -j"-Xms4g -Xmx4g" mysql_to_ots.json

其中-j"-Xms4g -Xmx4g"可以限制占用JVM内存的大小。如果不指定,则DataX将使用conf/core.json中的配置,默认为1 GB。

步骤四:全量同步加速

DataX的数据同步涉及数据读取、数据交换和数据写入三部分,您可以对每个部分进行优化加速。
  • 数据读取

    数据源读取有table模式和querySQL模式两种模式,选择table模式可以实现加速读取。具体操作,请参见步骤二:准备全量导出的JSON文件

  • 数据交换
    在数据交换部分,您可以通过以下方面进行同步优化。
    • JVM的内存

      发送给MySQL数据库SQL语句后会得到查询的数据集,并缓存在DataX的buffer中。除此之外,每个channel也维护了自己的record队列。如果存在并发,则channel的个数越多,也会需要更多的内存。因此您可以考虑指定JVM的内存大小参数,即在步骤三:执行同步命令中通过-j参数来指定JVM的内存大小。

    • channel的个数和流控参数
      conf/core.json中,控制channel的关键参数如下图所示。
      说明
      • 一般情况下,channel队列本身配置的调整并不常见,但是在使用DataX时应该注意byte和record流控参数。这两个参数都是在flowControlInterval间隔中采样后根据采样值来决定是否进行流控。
      • 为了提升同步效率,您可以适当提高channel的个数来提高并发数,以及调高每个channel的byte和record限制来提高DataX的吞吐量。
      • 请综合考虑channel的个数和流控参数,保证理论峰值不会对服务器产生过高的压力。
      channel的个数和流控参数
      详细参数说明请参见下表。
      参数 描述
      capacity 限制channel中队列的大小,即最多缓存的record个数。
      byteCapacity 限制record占用的内存大小,单位为字节。默认值为64 MB,如果不指定此参数,则占用内存大小会被配置为8 MB。
      byte 控流参数,限制通道的默认传输速率,-1表示不限制。
      record 控流参数,限制通道的传输记录个数,-1表示不限制。

      capacity和byteCapacity两个参数决定了每个channel能缓存的记录数量和内存占用情况。如果要调整相应参数,您需要按照DataX实际的运行环境进行配置。例如MySQL中每个record都比较大,那么可以考虑适当调高byteCapacity,调整该参数时还请同时考虑机器的内存情况。

      {
          "core": {  #定义了全局的系统参数,如果不指定会使用默认值。
              "transport": {
                  "channel": {
                      "speed": {
                          "record": 5000,
                          "byte": 102400
                      }
                  }
              }
          },
      
      
          "job": {
              "setting": {
                  "speed": {  #定义了单个channel的控制参数。
                      "record": 10000,
                  },
                  "errorLimit": {
                      "record": 0,
                      "percentage": 0.02
                  }
              },
              "content": [
                  {
                      "reader": {
                            .....#省略
                      },
      
                      "writer": {
                          .....#省略
                      }
      
                  }
              ]
          }
      }
  • 数据写入

    Tablestore是基于LSM设计的高性能高吞吐的分布式数据库产品,每一张表都会被切分为很多数据分区,数据分区分布在不同的服务器上,拥有极强的吞吐能力。如果写入能够分散在所有服务器上,则能够利用所有服务器的服务能力,更高速地写入数据,即表分区数量和吞吐能力正相关。

    新建的表默认分区数量都是1,分区数量会随着表不断写入数据而自动分裂不断增长,但是自动分裂的周期较长。对于新建表,如果需要马上进行数据导入,由于单分区很可能不够用导致数据导入不够顺畅,因此建议在新建表时对表进行预分区,在开始导入数据时即可获得极好的性能,而不用等待自动分裂。

    {
        "job": {
            "setting": {
               ....#省略
            },
            "content": [
                {
                    "reader": {
                          .....#省略
                    },
    
                    "writer": {
                        "name": "otswriter",
                        "parameter": {
                                .......
                            "writeMode":"UpdateRow",
                            "batchWriteCount":100
                        }
                    }
    
                }
            ]
        }
    }