您可以通过RDS MySQL表引擎或表函数导入数据至ClickHouse集群。本文为您介绍如何将RDS中的数据导入至ClickHouse集群。
使用RDS MySQL表引擎导入数据
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
其中,涉及参数描述如下表所示。
参数 |
描述 |
db |
数据库名。 |
table_name |
表名。 |
cluster |
集群标识。 |
name1/name2 |
列名。 |
tyep1/type2 |
列的类型。 |
host:port |
RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。 |
database |
RDS MySQL中的数据库名。 |
table |
RDS MySQL中的表名。 |
user |
用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password |
user 对应的密码。
|
replace_query |
是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause |
会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。
|
示例
- 在RDS MySQL实例中,创建原始数据表并导入原始数据。
- 连接MySQL实例,详情请参见通过客户端、命令行连接RDS MySQL。
- 执行以下命令,创建原始数据表。
CREATE TABLE `origin`.`orders` (
`uid` int(10) unsigned DEFAULT NULL,
`date` datetime DEFAULT NULL,
`skuId` int(10) unsigned DEFAULT NULL,
`order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 执行以下命令,导入原始数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
(38826285, '2021-08-03 10:47:29', 25166907, 27),
(10793515, '2021-07-31 02:10:31', 95584454, 68),
(70246093, '2021-08-01 00:00:08', 82355887, 97),
(70149691, '2021-08-02 12:35:45', 68748652, 1),
(87307646, '2021-08-03 19:45:23', 16898681, 71),
(61694574, '2021-08-04 23:23:32', 79494853, 35),
(61337789, '2021-08-02 07:10:42', 23792355, 55),
(66879038, '2021-08-01 16:13:19', 95820038, 89);
- 在ClickHouse集群中,执行以下操作。
- 使用SSH方式登录ClickHouse集群,详情请参见登录集群。
- 执行以下命令,进入ClickHouse客户端。
clickhouse-client -h core-1-1 -m
说明 本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
- 执行以下命令,创建数据库mysql。
CREATE DATABASE IF NOT EXISTS mysql;
- 执行以下命令,创建表orders。
CREATE TABLE mysql.orders
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');
- 执行以下命令,创建数据库product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
- 执行以下命令,创建业务表orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
- 执行以下命令,创建业务表orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
mysql.orders;
- 执行以下命令,查询数据。
SELECT a.*
FROM mysql.orders AS a
ANTI LEFT JOIN product.orders_all USING (uid);
使用RDS MySQL表函数导入数据
语法
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
其中,涉及参数描述如下表所示。
参数 |
描述 |
host:port |
RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。 |
database |
RDS MySQL中的数据库名。 |
table |
RDS MySQL中的表名。 |
user |
用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password |
user 对应的密码。
|
replace_query |
是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause |
会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。
|
示例
- 在RDS MySQL实例中,创建表并插入数据。
- 连接MySQL实例,详情请参见通过客户端、命令行连接RDS MySQL。
- 执行以下命令,创建表orders。
CREATE TABLE `origin`.`orders` (
`uid` int(10) unsigned DEFAULT NULL,
`date` datetime DEFAULT NULL,
`skuId` int(10) unsigned DEFAULT NULL,
`order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 执行以下命令,插入数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
(38826285, '2021-08-03 10:47:29', 25166907, 27),
(10793515, '2021-07-31 02:10:31', 95584454, 68),
(70246093, '2021-08-01 00:00:08', 82355887, 97),
(70149691, '2021-08-02 12:35:45', 68748652, 1),
(87307646, '2021-08-03 19:45:23', 16898681, 71),
(61694574, '2021-08-04 23:23:32', 79494853, 35),
(61337789, '2021-08-02 07:10:42', 23792355, 55),
(66879038, '2021-08-01 16:13:19', 95820038, 89);
- 在ClickHouse集群中,执行以下操作。
- 使用SSH方式登录ClickHouse集群,详情请参见登录集群。
- 执行以下命令,进入ClickHouse客户端。
clickhouse-client -h core-1-1 -m
说明 本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
- 执行以下命令,创建数据库product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
- 执行以下命令,创建表orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
- 执行以下命令,创建表orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
mysql('host:port', 'origin', 'orders', 'user', 'password');
- 执行以下命令,查询数据。
SELECT a.*
FROM
mysql('host:port', 'origin', 'orders', 'user', 'password') AS a
ANTI LEFT JOIN product.orders_all USING (uid);
如果您需要导出数据,则将业务表数据写入MySQL表函数即可。写入命令如下。
INSERT INTO FUNCTION
mysql('host:port', 'origin', 'orders', 'user', 'password')
FROM
product.orders_all;