文档

迁移MySQL或Doris数据到StarRocks

更新时间:

迁移Doris数据与迁移MySQL数据的方法一致。本文为您介绍如何迁移RDS MySQL中的数据至EMR Serverless StarRocks。

前提条件

  • 已创建EMR Serverless StarRocks实例,详情请参见创建实例

  • 已在EMR on ECS上创建包含Doris服务的OLAP集群,详情请参见创建集群,或已购买RDS,详情请参见创建RDS MySQL实例

    说明

    迁移Doris数据与迁移MySQL数据的方法一致。本文以迁移RDS MySQL数据为例。

使用限制

EMR Serverless StarRocks实例和RDS MySQL实例需要在同一VPC下。

使用DataX同步数据

DataX介绍

DataX是阿里巴巴集团内被广泛使用的离线数据同步工具,实现了包括MySQL、Oracle、OceanBase、SqlServer、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS等各种异构数据源之间高效的数据同步功能。DataX详细信息,请参见DataX

适用场景

适用于熟悉DataX、迁移批量数据的场景,可以一次迁移大批量数据至EMR StarRocks Serverless。

操作步骤

  1. 配置DataX中的mysqlreader。

    "reader": {
      "name": "mysqlreader",
      "parameter": {
        "username": "root",
        "password": "***",
        "column": [ "k1", "k2", "v1", "v2" ],
        "connection": [
          {
            "table": [ "table1", "table2" ],
            "jdbcUrl": [
              "jdbc:mysql://<mysql_host>:<mysql_port>/datax_test1"
            ]
          },
          {
            "table": [ "table3", "table4" ],
            "jdbcUrl": [
              "jdbc:mysql://<mysql_host>:<mysql_port>/datax_test2"
            ]
          }
        ]
      }
    }

    相关参数如下表所示。

    参数

    说明

    是否必选

    username

    MySQL的用户名。

    password

    MySQL用户的密码。

    database

    目标数据库名称。

    table

    待同步的表名。

    jdbcUrl

    JDBC连接信息,填写的格式为jdbc:mysql://<mysql_host>:<mysql_port>/<datax_test1>

    格式中:

    • <mysql_host>:RDS的内网地址。您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    • <mysql_port>:固定值3306。

    • <datax_test1>:MySQL数据库的名称。

    column

    待配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。支持列裁剪,即列可以挑选部分列进行导出。

    重要

    如果希望导入所有字列配置,则可以使用[’’]

    splitPk

    如果指定该参数,则表示使用splitPk代表的字段进行数据分片,DataX会启动并发任务进行数据同步,可以大大提高数据同步的效能。

    querySql

    在某些业务场景下,您可以通过该配置型来自定义筛选SQL。如果指定了该参数,则DataX系统就会忽略table、column这些配置型,直接使用该配置项的内容对数据进行筛选。

    where

    筛选条件,mysqlreader根据指定的column、table、where条件拼接SQL,并根据该SQL进行数据抽取。

  2. 配置StarRocks Writer。

    下载DataX插件,然后执行以下命令,解压安装包至datax/plugin/writer路径下。

    tar -xzvf starrockswriter.tar.gz

    StarRocks Writer的配置方式,请参见DataX Writer

  3. 配置DataX的JSON脚本。

    为导入作业新建一个JSON格式的文件job.json,示例中的参数值请根据实际情况替换。

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 1
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "root",
                "password": "****",
                "column": [ "k1", "k2", "v1", "v2" ],
                "connection": [
                  {
                    "table": [ "table1", "table2" ],
                    "jdbcUrl": [
                      "jdbc:mysql://<mysql_host>:<mysql_port>/datax_test1"
                    ]
                  },
                  {
                    "table": [ "table3", "table4" ],
                    "jdbcUrl": [
                      "jdbc:mysql://<mysql_host>:<mysql_port>/datax_test2"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "starrockswriter",
              "parameter": {
                "username": "admin",
                "password": "***",
                "database": "test",
                "table": "xxxx",
                "column": ["k1", "k2", "v1", "v2"],
                "preSql": [],
                "postSql": [], 
                "jdbcUrl": "jdbc:mysql://<fe_host>:<fe_port>/",
                "loadUrl": ["<fe_host>:<fe_http_port>", "<fe_host>:<fe_http_port>"],
                "loadProps": {}
              }
            }
          }
        ]
      }
    }
  4. 执行同步任务。

    python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json
    说明
    • 示例代码中的Xms和Xmx ,通常建议将内存设置为4G或者8G,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。

    • 调整JVM Xms和Xmx参数的两种方式:一种是直接更改datax.py;另一种是在启动的时候,加上对应的参数,例如python datax/bin/datax.py --jvm="-Xms8G -Xmx8G"XXX.json

使用StarRocks外部表同步数据

方案介绍

该方案是通过StarRocks的外部表,将源集群数据同步至目标集群。通过MySQL协议的外部表进行同步,在目标集群创建目标表,在源集群创建外部表,将数据插入外部表实现数据迁移。

适用场景

适用于单表数据迁移的场景。如果表数量大,则操作会比较繁琐,耗时较高。

操作步骤

  1. 创建StarRocks外部表。

    CREATE EXTERNAL TABLE uk_price_paid_ex 
    ( 
        price BIGINT(20) NULL COMMENT "",
        date1 DATE NULL COMMENT "",
        postcode1 String NULL COMMENT "",
        postcode2 String NULL COMMENT "",
        type String NULL COMMENT "",
        is_new BIGINT(20) NULL COMMENT "",
        duration String NULL COMMENT "",
        addr1 String NULL COMMENT "",
        addr2 String NULL COMMENT "",
        street String NULL COMMENT "",
        locality String NULL COMMENT "",
        town String NULL COMMENT "" ,
        district String NULL COMMENT "",
        county String NULL COMMENT "" 
    )
    ENGINE=mysql 
    PROPERTIES (
      "host"="rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com",
      "port"="3306",
      "user"="root",
      "password"="",
      "database"="test",
      "table"="uk_price_paid"
    );

    各参数含义如下表所示。

    参数

    说明

    host

    RDS MySQL的内网地址。

    您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    RDS MySQL的端口。固定值是3306。

    user

    RDS MySQL上创建的账号。

    password

    RDS MySQL上创建账号的密码。

    database

    RDS MySQL上数据库的名称。

    table

    RDS MySQL上表的名称。

  2. 创建StarRocks目标表。

    需选择对应模型,并指定主键。

    CREATE TABLE IF NOT EXISTS uk_price_paid 
    ( 
        price BIGINT(20) NULL COMMENT "",
        date1 DATE NULL COMMENT "",
        postcode1 String NULL COMMENT "",
        postcode2 String NULL COMMENT "",
        type String NULL COMMENT "",
        is_new BIGINT(20) NULL COMMENT "",
        duration String NULL COMMENT "",
        addr1 String NULL COMMENT "",
        addr2 String NULL COMMENT "",
        street String NULL COMMENT "",
        locality String NULL COMMENT "",
        town String NULL COMMENT "" ,
        district String NULL COMMENT "",
        county String NULL COMMENT "" 
    ) 
    DUPLICATE KEY(price) 
    DISTRIBUTED BY HASH(price) 
    BUCKETS 8 PROPERTIES 
    ( "replication_num" = "1" );
  3. 导入数据。

    insert into uk_price_paid select * from uk_price_paid_ex;