使用程序导入数据

本文将介绍如何通过编写代码的方式,将数据导入到PolarDB-X中。

以如下表结构示例进行说明:

CREATE TABLE `sourcedb` (
    `id` int(11) NOT NULL,
    `k` int(11) NOT NULL DEFAULT '0',
    `c` char(120) NOT NULL DEFAULT '',
    `pad` char(60) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    KEY `k_1` (`k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 dbpartition by hash(`id`);

从数据库中导出源数据

源数据可以由用户自行生成,也可以从数据库中导出,本文以mysql -e命令的方式导出进行示例,具体方法如下:

mysql -h ip  -P port -u usr -pPassword db_name -N -e "SELECT id,k,c,pad FROM sourcedb;" >/home/data_1000w.txt
## 原始数据以制表符分隔,数据格式:188092293    27267211    59775766593-64673028018-...-09474402685    01705051424-...-54211554755

mysql -h ip  -P port -u usr -pPassword db_name -N -e "SELECT id,k,c,pad FROM sourcedb;" | sed 's/\t/,/g' >/home/data_1000w.csv
## csv文件格式以逗号分隔,数据格式:188092293,27267211,59775766593-64673028018-...-09474402685,01705051424-...-54211554755

推荐将字符串进行处理,转变成csv文件格式,方便后续程序读取数据。

PolarDB-X中创建目标表

源数据不包括建表语句,需要手动在PolarDB-X目标数据库上创建表,示例如下:

CREATE TABLE `targetdb` (
    `id` int(11) NOT NULL,
    `k` int(11) NOT NULL DEFAULT '0',
    `c` char(120) NOT NULL DEFAULT '',
    `pad` char(60) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    KEY `k_1` (`k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 dbpartition by hash(`id`);

使用程序导入数据到PolarDB-X

您可以自行编写程序,连接PolarDB-X,然后读取本地数据,通过Batch Insert语句导入到PolarDB-X中。

下面是一个简单的Java程序示例:

// 需要mysql-connector-java.jar, 详情界面:https://mvnrepository.com/artifact/mysql/mysql-connector-java
// 下载链接:https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
// 注:不同版本的mysql-connector-java.jar,可能Class.forName("com.mysql.cj.jdbc.Driver")类路径不同
// 编译 javac LoadData.java
// 运行 java -cp .:mysql-connector-java-8.0.20.jar LoadData

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class LoadData {
    public static void main(String[] args) throws IOException, ClassNotFoundException, SQLException {
        File dataFile = new File("/home/data_1000w.csv");
        String sql = "insert into targetdb(id, k, c, pad) values(?, ?, ?, ?)";
        int batchSize = 1000;
        try (
            Connection connection = getConnection("ip", 3306, "db", "usr", "password");
            BufferedReader br = new BufferedReader(new FileReader(dataFile))) {
            String line;
            PreparedStatement st = connection.prepareStatement(sql);
            long startTime = System.currentTimeMillis();
            int batchCount = 0;

            while ((line = br.readLine()) != null) {
                String[] data = line.split(",");
                st.setInt(1, Integer.valueOf(data[0]));
                st.setInt(2, Integer.valueOf(data[1]));
                st.setObject(3, data[2]);
                st.setObject(4, data[3]);

                st.addBatch();
                if (++batchCount % batchSize == 0) {
                    st.executeBatch();
                    System.out.println(String.format("insert %d records", batchCount));
                }
            }
            if (batchCount % batchSize != 0) {
                st.executeBatch();
            }
            long cost = System.currentTimeMillis() - startTime;
            System.out.println(String.format("Take %d second,insert %d records, tps %d", cost/1000, batchCount, batchCount/(cost/1000)));
        }
    }
    /**
     * 获取数据库连接
     *
     * @param host     数据库地址
     * @param port     端口
     * @param database 数据库名称
     * @param username 用户名
     * @param password 密码
     * @return
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    private static Connection getConnection(String host, int port, String database, String username, String password)
        throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.cj.jdbc.Driver");
        String url = String.format(
            "jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,
            database);
        Connection con = DriverManager.getConnection(url, username, password);
        return con;
    }
}

您可以根据实际应用场景编写程序,设置合适的batch size和多线程导入,来提高导入的性能。