本文介绍使用DataX Doris Writer同步数据至云数据库 SelectDB 版。
概述
DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。您可以通过DataX服务读取上游数据,然后由DataX Doris Writer将数据写入到云数据库 SelectDB 版。
前提条件
- 已安装Maven环境。 
- 已安装Python3.6及以上版本。 
使用示例
如下以MySQL数据源为例,介绍在Linux环境下如何通过DataX将MySQL数据导入至云数据库 SelectDB 版。
步骤一:配置DataX环境
- 下载DataX程序包代码,插件代码下载请访问Doris社区。 
- 运行DataX程序包中的init-env.sh脚本,构建DataX开发环境。 - sh init-env.sh
- 编译mysqlreader和doriswriter。 - 编译整个DataX项目。 - cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=true- 编译产出结果在target/datax/datax/.目录下。 说明- hdfsreader,hdfswriter,ossreader和osswriter这四个插件需要额外的jar包,如果不需要这些插件,可以在DataX/pom.xml中删除这些插件的模块。 
- 单独编译mysqlreader和selectdbwriter插件。 - mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
 
步骤二:构造需要导入的数据
- 创建MySQL测试表。 - CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
- 使用DMS构建测试数据,详情请参见测试数据构建。 
步骤三:配置云数据库 SelectDB 版实例。
- 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。 
- 创建测试数据库和测试表。 - 创建测试数据库。 - CREATE DATABASE test_db;
- 创建测试表。 - USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
 
步骤四:通过DataX服务同步MySQL数据到SelectDB
- 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址。 
- 将DataX主机的公网IP添加到IP白名单中,详情请参见设置白名单。 
- 创建配置文件 - mysqlToSelectDB.json,配置任务信息。- { "job":{ "content":[ { "reader":{ "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": [ "employees" ] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer":{ "name":"doriswriter", "parameter":{ "loadUrl":[ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps":{ "format":"json", "strip_outer_array":"true" }, "column":[ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username":"admin", "password":"123456", "postSql":[ ], "preSql":[ ], "connection":[ { "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table":[ "employees" ], "selectedDatabase":"test_db" } ], "maxBatchRows":1000000, "batchSize":536870912000 } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":5 } } } }
- 参数说明 - 参数 - 是否必填 - 默认值 - 描述 - jdbcUrl - 是 - 无 - JDBC连接URL - jdbc:mysql://<ip>:<port>。- 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和MySQL协议端口。 - IP地址:VPC地址或公网地址。 - 端口:MySQL协议端口。 - 示例: - jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030说明- 如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。 - loadUrl - 是 - 无 - SelectDB的HTTP协议访问地址 - <ip>:<port>。- 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和HTTP协议端口。 - IP地址:VPC地址或公网地址。 - 端口:HTTP协议端口。 - 示例: - selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080说明- 如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。 - username - 是 - 无 - 云数据库 SelectDB 版实例的用户名。 - password - 是 - 无 - 云数据库 SelectDB 版实例对应用户的密码。 - connection.selectedDatabase - 是 - 无 - 需要写入的云数据库 SelectDB 版数据库名称。 - connection.table - 是 - 无 - 需要写入的云数据库 SelectDB 版表名称。 - column - 是 - 无 - 目的表需要写入数据的字段,这些字段将作为生成的JSON数据的字段名。字段之间用英文逗号分隔。示例: - "column": ["id","name","age"]。- preSql - 否 - 无 - 写入数据到目的表前,会先执行这里的标准语句。 - postSql - 否 - 无 - 写入数据到目的表后,会执行这里的标准语句。 - maxBatchRows - 否 - 500000 - 每批次导入数据的最大行数。和batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 - batchSize - 否 - 104857600 - 每批次导入数据的最大数据量。和maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。默认值为100 M。 - maxRetries - 否 - 3 - 每批次导入数据失败后的重试次数。 - labelPrefix - 否 - datax_doris_writer_ - 每批次上传文件的label前缀。最终的label将由 'labelPrefix + UUID'组成全局唯一的label,确保数据不会重复导入。 - loadProps - 否 - 无 - 与Stream Load的请求参数相同。详情请参见Stream Load参数说明。配置导入数据格式使用参数format,导入数据格式默认使用CSV,支持JSON,详情请参考类型转换。 - flushInterval - 否 - 30000 - 数据写入批次的时间间隔。默认为30000ms 
- 命令行提交任务。 - cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
类型转换
默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行SelectDB导入操作。 默认是csv格式导入,如需更改列分隔符, 则正确配置loadProps即可,示例如下。
"loadProps": {
    "format": "csv",
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
}如需更改导入格式为JSON,则正确配置loadProps下的format即可,示例如下。
"loadProps": {
    "format": "json",
    "strip_outer_array": true
}