您可以通过RDS MySQL表引擎或表函数导入数据至ClickHouse集群。本文为您介绍如何将RDS中的数据导入至ClickHouse集群。

前提条件

使用RDS MySQL表引擎导入数据

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
其中,涉及参数描述如下表所示。
参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
host:port RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在RDS MySQL实例中,创建原始数据表并导入原始数据。
    1. 连接MySQL实例,详情请参见通过客户端、命令行连接RDS MySQL
    2. 执行以下命令,创建原始数据表。
      CREATE TABLE `origin`.`orders` (
        `uid` int(10) unsigned DEFAULT NULL,
        `date` datetime DEFAULT NULL,
        `skuId` int(10) unsigned DEFAULT NULL,
        `order_revenue` int(10) unsigned DEFAULT NULL
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    3. 执行以下命令,导入原始数据。
      INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
             (38826285, '2021-08-03 10:47:29', 25166907, 27),
             (10793515, '2021-07-31 02:10:31', 95584454, 68),
             (70246093, '2021-08-01 00:00:08', 82355887, 97),
             (70149691, '2021-08-02 12:35:45', 68748652, 1),
             (87307646, '2021-08-03 19:45:23', 16898681, 71),
             (61694574, '2021-08-04 23:23:32', 79494853, 35),
             (61337789, '2021-08-02 07:10:42', 23792355, 55),
             (66879038, '2021-08-01 16:13:19', 95820038, 89);
  2. 在ClickHouse集群中,执行以下操作。
    1. 使用SSH方式登录ClickHouse集群,详情请参见登录集群
    2. 执行以下命令,进入ClickHouse客户端。
      clickhouse-client -h core-1-1 -m
      说明 本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
    3. 执行以下命令,创建数据库mysql。
      CREATE DATABASE IF NOT EXISTS mysql;
    4. 执行以下命令,创建表orders。
      CREATE TABLE mysql.orders
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');
    5. 执行以下命令,创建数据库product。
      CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    6. 执行以下命令,创建业务表orders。
      CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
      PARTITION BY toYYYYMMDD(date)
      ORDER BY toYYYYMMDD(date);
    7. 执行以下命令,创建业务表orders_all。
      CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = Distributed(cluster_emr, product, orders, rand());
    8. 执行以下命令,导入数据。
      INSERT INTO product.orders_all
      SELECT
        uid,
        date,
        skuId,
        order_revenue
      FROM
        mysql.orders;
    9. 执行以下命令,查询数据。
      SELECT a.*
      FROM mysql.orders AS a
      ANTI LEFT JOIN product.orders_all USING (uid);
      说明 查询数据为空时正常。

使用RDS MySQL表函数导入数据

语法

mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
其中,涉及参数描述如下表所示。
参数 描述
host:port RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在RDS MySQL实例中,创建表并插入数据。
    1. 连接MySQL实例,详情请参见通过客户端、命令行连接RDS MySQL
    2. 执行以下命令,创建表orders。
      CREATE TABLE `origin`.`orders` (
        `uid` int(10) unsigned DEFAULT NULL,
        `date` datetime DEFAULT NULL,
        `skuId` int(10) unsigned DEFAULT NULL,
        `order_revenue` int(10) unsigned DEFAULT NULL
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    3. 执行以下命令,插入数据。
      INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
             (38826285, '2021-08-03 10:47:29', 25166907, 27),
             (10793515, '2021-07-31 02:10:31', 95584454, 68),
             (70246093, '2021-08-01 00:00:08', 82355887, 97),
             (70149691, '2021-08-02 12:35:45', 68748652, 1),
             (87307646, '2021-08-03 19:45:23', 16898681, 71),
             (61694574, '2021-08-04 23:23:32', 79494853, 35),
             (61337789, '2021-08-02 07:10:42', 23792355, 55),
             (66879038, '2021-08-01 16:13:19', 95820038, 89);
  2. 在ClickHouse集群中,执行以下操作。
    1. 使用SSH方式登录ClickHouse集群,详情请参见登录集群
    2. 执行以下命令,进入ClickHouse客户端。
      clickhouse-client -h core-1-1 -m
      说明 本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
    3. 执行以下命令,创建数据库product。
      CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    4. 执行以下命令,创建表orders。
      CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
      PARTITION BY toYYYYMMDD(date)
      ORDER BY toYYYYMMDD(date);
    5. 执行以下命令,创建表orders_all。
      CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = Distributed(cluster_emr, product, orders, rand());
    6. 执行以下命令,导入数据。
      INSERT INTO product.orders_all
      SELECT
        uid,
        date,
        skuId,
        order_revenue
      FROM
        mysql('host:port', 'origin', 'orders', 'user', 'password');
    7. 执行以下命令,查询数据。
      SELECT a.*
      FROM
        mysql('host:port', 'origin', 'orders', 'user', 'password') AS a
      ANTI LEFT JOIN product.orders_all USING (uid);
      说明 查询数据为空时正常。
      如果您需要导出数据,则将业务表数据写入MySQL表函数即可。写入命令如下。
      INSERT INTO FUNCTION
        mysql('host:port', 'origin', 'orders', 'user', 'password')
      FROM
        product.orders_all;