SeaTunnel集成云数据库 SelectDB 版,支持使用SeaTunnel SelectDB Sink导入表数据至云数据库 SelectDB 版。本文将为您介绍使用SeaTunnel SelectDB Sink同步数据至云数据库 SelectDB 版的使用方式。
概述
SeaTunnel是一款简单易用、高性能的分布式数据集成平台,支持海量数据实时同步。您可以通过SeaTunnel平台读取MySQL、Hive、Kafka等数据源中的海量数据,然后由SeaTunnel SelectDB Sink将数据写入到云数据库 SelectDB 版中。
前提条件
SeaTunnel 2.3.1版本及以上。
使用方式
SeaTunnel支持以JSON格式或CSV格式将上游数据写入到云数据库 SelectDB 版,不同写入方式的配置语法如下。
JSON格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="json"
}
}
}
CSV格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="csv"
file.column_separator=","
file.line_delimiter="\n"
}
}
}
参数说明如下。
参数 | 是否必填 | 说明 |
load-url | 是 | 云数据库 SelectDB 版实例的访问地址和HTTP协议端口。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
jdbc-url | 是 | 云数据库 SelectDB 版实例的访问地址和MySQL协议端口。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例: |
cluster-name | 是 | 云数据库 SelectDB 版实例中的集群名称。实例中可能包含多个集群,可按需选择。 |
username | 是 | 云数据库 SelectDB 版实例的用户名。 |
password | 是 | 云数据库 SelectDB 版实例对应用户名的密码。 |
table.identifier | 是 | 云数据库 SelectDB 版实例的表名,格式为 |
selectdb.config | 是 | 写入任务的属性配置。
|
sink.enable-delete | 否 | 是否开启批量删除功能(仅支持Unique表)。 |
sink.buffer-size | 否 | 缓存的最大容量,单位字节,默认为:10MB,当缓存超过最大容量时,会将缓存中的内容全部flush到对象存储上,不建议修改。 |
sink.buffer-count | 否 | 缓存的最大条数,默认为:10000,当缓存超过最大条数时,会将缓存中的内容全部flush到对象存储上,不建议修改。 |
sink.max-retries | 否 | Commit阶段的最大重试次数。默认3次。 |
sink.enable-2pc | 否 | 是否启用两阶段提交,以确保exact-once语义。默认为true。 |
使用示例
以MySQL数据源为例,为您介绍如何通过SeaTunnel将上游的MySQL数据导入至云数据库 SelectDB 版。示例中各软件版本如下:
环境 | 版本 |
JDK | 1.8 |
SeaTunnel | 2.3.3 |
SelectDB | 3.0.4 |
环境准备
配置SeaTunnel环境。
下载并解压SeaTunnel安装包。本示例中使用SeaTunnel安装包:apache-seatunnel-2.3.3-bin.tar.gz。
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
修改SEATUNNEL_HOME/config/plugin_config配置文件,保留需要的Connector插件。
--connectors-v2-- connector-cdc-mysql connector-selectdb-cloud connector-jdbc connector-fake connector-console connector-assert --end--
安装SeaTunnel Connector插件。
sh bin/install-plugin.sh
下载MySQL驱动并放至SEATUNNEL_HOME/jar目录。
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
构造需要导入的数据。本文以MySQL为例,构造少量样例数据来完成导入。
创建MySQL测试表。
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
使用DMS构建测试数据,详情请参见测试数据构建。
配置云数据库 SelectDB 版实例。
创建云数据库 SelectDB 版实例,详情请参见创建实例。
通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址。
将SeaTunnel环境的公网IP添加到IP白名单中,详情请参见设置白名单。
通过SuaTunnel本地引擎同步MySQL数据到SelectDB
创建配置文件
mysqlToSelectDB.conf
,配置任务信息。env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "****" query = "select * from employees" } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" cluster-name="new_cluster" table.identifier="test_db.employees" username="admin" password="****" selectdb.config { file.type="json" } } }
命令行提交任务。
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local