通过SeaTunnel导入数据

SeaTunnel集成云数据库 SelectDB 版,支持使用SeaTunnel SelectDB Sink导入表数据至云数据库 SelectDB 版。本文将为您介绍使用SeaTunnel SelectDB Sink同步数据至云数据库 SelectDB 版的使用方式。

概述

SeaTunnel是一款简单易用、高性能的分布式数据集成平台,支持海量数据实时同步。您可以通过SeaTunnel平台读取MySQL、Hive、Kafka等数据源中的海量数据,然后由SeaTunnel SelectDB Sink将数据写入到云数据库 SelectDB 版中。

前提条件

SeaTunnel 2.3.1版本及以上。

使用方式

SeaTunnel支持以JSON格式或CSV格式将上游数据写入到云数据库 SelectDB 版,不同写入方式的配置语法如下。

JSON格式

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="json" 
    } 
  }
}

CSV格式

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="csv" 
      file.column_separator="," 
      file.line_delimiter="\n" 
    } 
  }
}

参数说明如下。

参数

是否必填

说明

load-url

云数据库 SelectDB 版实例的访问地址和HTTP协议端口。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

云数据库 SelectDB 版实例的访问地址和MySQL协议端口。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster-name

云数据库 SelectDB 版实例中的集群名称。实例中可能包含多个集群,可按需选择。

username

云数据库 SelectDB 版实例的用户名。

password

云数据库 SelectDB 版实例对应用户名的密码。

table.identifier

云数据库 SelectDB 版实例的表名,格式为库名.表名。示例:test_db.test_table

selectdb.config

写入任务的属性配置。

  • CSV写入如下:

    selectdb.config { file.type='csv' file.column_separator=',' file.line_delimiter='\n' } 
  • JSON写入如下:

    selectdb.config { file.type="json" file.strip_outer_array="false" }

sink.enable-delete

是否开启批量删除功能(仅支持Unique表)。

sink.buffer-size

缓存的最大容量,单位字节,默认为:10MB,当缓存超过最大容量时,会将缓存中的内容全部flush到对象存储上,不建议修改。

sink.buffer-count

缓存的最大条数,默认为:10000,当缓存超过最大条数时,会将缓存中的内容全部flush到对象存储上,不建议修改。

sink.max-retries

Commit阶段的最大重试次数。默认3次。

sink.enable-2pc

是否启用两阶段提交,以确保exact-once语义。默认为true。

使用示例

MySQL数据源为例,为您介绍如何通过SeaTunnel将上游的MySQL数据导入至云数据库 SelectDB 版。示例中各软件版本如下:

环境

版本

JDK

1.8

SeaTunnel

2.3.3

SelectDB

3.0.4

环境准备

  1. 配置SeaTunnel环境。

    1. 下载并解压SeaTunnel安装包。本示例中使用SeaTunnel安装包:apache-seatunnel-2.3.3-bin.tar.gz。

      wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
      tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
    2. 修改SEATUNNEL_HOME/config/plugin_config配置文件,保留需要的Connector插件。

      --connectors-v2--
      connector-cdc-mysql
      connector-selectdb-cloud
      connector-jdbc
      connector-fake
      connector-console
      connector-assert
      --end--
    3. 安装SeaTunnel Connector插件。

      sh bin/install-plugin.sh
    4. 下载MySQL驱动并放至SEATUNNEL_HOME/jar目录。

      cd lib/
      wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
  2. 构造需要导入的数据。本文以MySQL为例,构造少量样例数据来完成导入。

    1. 创建MySQL测试表。

      CREATE TABLE `employees` (
        `emp_no` INT NOT NULL,
        `birth_date` DATE NOT NULL,
        `first_name` VARCHAR(14) NOT NULL,
        `last_name` VARCHAR(16) NOT NULL,
        `gender` ENUM('M','F') NOT NULL,
        `hire_date` DATE NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS构建测试数据,详情请参见测试数据构建

  3. 配置云数据库 SelectDB 版实例。

    1. 创建云数据库 SelectDB 版实例,详情请参见创建实例

    2. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

    3. 创建测试数据库和测试表。

      1. 创建测试数据库。

        CREATE DATABASE test_db;
      2. 创建测试表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       INT NOT NULL,
            birth_date   DATE,
            first_name   VARCHAR(20),
            last_name    VARCHAR(20),
            gender       CHAR(2),
            hire_date    DATE
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
    4. 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址

    5. SeaTunnel环境的公网IP添加到IP白名单中,详情请参见设置白名单

通过SuaTunnel本地引擎同步MySQL数据到SelectDB

  1. 创建配置文件mysqlToSelectDB.conf,配置任务信息。

    env {
      execution.parallelism = 2
      job.mode = "BATCH"
      checkpoint.interval = 10000
    }
    
    source{
      jdbc {
        url = "jdbc:mysql://host:ip/test_db"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "admin"
        password = "****"
        query = "select * from employees"
      }
    }
     
    sink {
      SelectDBCloud {
        load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080"
        jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030"
        cluster-name="new_cluster"
        table.identifier="test_db.employees"
        username="admin"
        password="****"
        selectdb.config {
            file.type="json"
        }
      }
    }
  2. 命令行提交任务。

    sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local