通过DataX导入数据

更新时间: 2024-06-21 16:34:17

本文介绍使用DataX Doris Writer同步数据至云数据库 SelectDB 版

概述

DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。您可以通过DataX服务读取上游数据,然后由DataX Doris Writer将数据写入到云数据库 SelectDB 版

前提条件

  • 已安装Maven环境。

  • 已安装Python3.6及以上版本。

使用示例

如下以MySQL数据源为例,介绍在Linux环境下如何通过DataX将MySQL数据导入至云数据库 SelectDB 版

步骤一:配置DataX环境

  1. 下载DataX程序包代码,插件代码下载请访问Doris社区

  2. 运行DataX程序包中的init-env.sh脚本,构建DataX开发环境。

    sh init-env.sh
  3. 编译mysqlreader和doriswriter。

    1. 编译整个DataX项目。

      cd DataX/
      mvn package assembly:assembly -Dmaven.test.skip=true

      编译产出结果在target/datax/datax/.目录下。

      说明

      hdfsreader,hdfswriter,ossreader和osswriter这四个插件需要额外的jar包,如果不需要这些插件,可以在DataX/pom.xml中删除这些插件的模块。

    2. 单独编译mysqlreader和selectdbwriter插件。

      mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests

步骤二:构造需要导入的数据

  1. 创建MySQL测试表。

    CREATE TABLE `employees` (
      `emp_no` int NOT NULL,
      `birth_date` date NOT NULL,
      `first_name` varchar(14) NOT NULL,
      `last_name` varchar(16) NOT NULL,
      `gender` enum('M','F') NOT NULL,
      `hire_date` date NOT NULL,
      PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
  2. 使用DMS构建测试数据,详情请参见测试数据构建

步骤三:配置云数据库 SelectDB 版实例。

  1. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

  2. 创建测试数据库和测试表。

    1. 创建测试数据库。

      CREATE DATABASE test_db;
    2. 创建测试表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

步骤四:通过DataX服务同步MySQL数据到SelectDB

  1. 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址

  2. 将DataX主机的公网IP添加到IP白名单中,详情请参见设置白名单

  3. 创建配置文件mysqlToSelectDB.json,配置任务信息。

    {
      "job":{
        "content":[
          {
            "reader":{
                "name": "mysqlreader",
                "parameter": {
                    "column": [
                        "emp_no",
                        "birth_date",
                        "first_name",
                        "last_name",
                        "gender",
                        "hire_date"
                    ],
                    "where": "emp_no>0",
                    "connection": [
                        {
                            "jdbcUrl": [
                                "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                            ],
                            "table": [
                                "employees"
                            ]
                        }
                    ],
                    "password": "123456",
                    "splitPk": "emp_no",
                    "username": "admin"
                }
            },
            "writer":{
              "name":"doriswriter",
              "parameter":{
                "loadUrl":[
                  "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080"
                ],
                "loadProps":{
                  "format":"json",
                  "strip_outer_array":"true"
                },
                "column":[
                    "emp_no",
                    "birth_date",
                    "first_name",
                    "last_name",
                    "gender",
                    "hire_date"
                ],
                "username":"admin",
                "password":"123456",
                "postSql":[
    
                ],
                "preSql":[
    
                ],
                "connection":[
                  {
                    "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db",
                    "table":[
                      "employees"
                    ],
                    "selectedDatabase":"test_db"
                  }
                ],
                "maxBatchRows":1000000,
                "batchSize":536870912000
              }
            }
          }
        ],
        "setting":{
          "errorLimit":{
            "percentage":0.02,
            "record":0
          },
          "speed":{
            "channel":5
          }
        }
      }
    }

  4. 参数说明

    参数

    是否必填

    默认值

    描述

    jdbcUrl

    JDBC连接URLjdbc:mysql://<ip>:<port>

    您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和MySQL协议端口。

    IP地址:VPC地址公网地址

    端口:MySQL协议端口

    示例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

    说明

    如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。

    loadUrl

    SelectDB的HTTP协议访问地址<ip>:<port>

    您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和HTTP协议端口。

    IP地址:VPC地址公网地址

    端口:HTTP协议端口

    示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    说明

    如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。

    username

    云数据库 SelectDB 版实例的用户名。

    password

    云数据库 SelectDB 版实例对应用户的密码。

    connection.selectedDatabase

    需要写入的云数据库 SelectDB 版数据库名称。

    connection.table

    需要写入的云数据库 SelectDB 版表名称。

    column

    目的表需要写入数据的字段,这些字段将作为生成的JSON数据的字段名。字段之间用英文逗号分隔。示例:"column": ["id","name","age"]

    preSql

    写入数据到目的表前,会先执行这里的标准语句。

    postSql

    写入数据到目的表后,会执行这里的标准语句。

    maxBatchRows

    500000

    每批次导入数据的最大行数。和batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

    batchSize

    104857600

    每批次导入数据的最大数据量。和maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。默认值为100 M。

    maxRetries

    3

    每批次导入数据失败后的重试次数。

    labelPrefix

    datax_doris_writer_

    每批次上传文件的label前缀。最终的label将由 'labelPrefix + UUID'组成全局唯一的label,确保数据不会重复导入。

    loadProps

    与Stream Load的请求参数相同。详情请参见Stream Load参数说明。配置导入数据格式使用参数format,导入数据格式默认使用CSV,支持JSON,详情请参考类型转换

    flushInterval

    30000

    数据写入批次的时间间隔。默认为30000ms

  5. 命令行提交任务。

    cd target/datax/datax/bin
    python datax.py ../mysqlToSelectDB.json

类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行SelectDB导入操作。 默认是csv格式导入,如需更改列分隔符, 则正确配置loadProps即可,示例如下。

"loadProps": {
    "format": "csv",
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
}

如需更改导入格式为JSON,则正确配置loadProps下的format即可,示例如下。

"loadProps": {
    "format": "json",
    "strip_outer_array": true
}

上一篇: 通过Spark导入数据 下一篇: 通过SeaTunnel导入数据