您可以通过HDFS表引擎或表函数读写数据。本文为您介绍如何将HDFS中的数据导入至ClickHouse集群以及如何从ClickHouse集群导出数据到HDFS。
前提条件
已创建Hadoop集群,详情请参见创建集群。
已创建ClickHouse集群,详情请参见创建ClickHouse集群。
注意事项
本文代码示例中HDFS URL中的9000为非HA模式下NameNode的端口,如果使用的是HA模式下的NameNode,则端口通常为8020。
HDFS集群数据导入至ClickHouse集群
步骤一:创建业务表
使用SSH方式登录ClickHouse集群,详情请参见登录集群。
执行以下命令,进入ClickHouse客户端。
clickhouse-client -h core-1-1 -m
说明本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
执行以下命令,创建数据库product,并在product数据库中创建业务表orders。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date); CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
说明示例中的{shard}和{replica}是阿里云EMR为ClickHouse集群自动生成的宏定义,可以直接使用。
步骤二:导入数据
通过HDFS表引擎导入数据
ClickHouse的HDFS表引擎能够从指定HDFS地址读取特定格式的文件数据,语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
)
Engine = HDFS(uri, format);
参数 | 描述 |
| 数据库名。 |
| 表名。 |
| 列名。 |
| 列的类型。 |
| HDFS上文件的地址。 |
| 文件的类型。 |
其中uri不能为目录地址,且文件所属的目录需要存在,否则写数据时会报错。
创建HDFS引擎表并准备数据。
下载并上传示例数据orders.csv至HDFS集群的目录下,本文将文件上传到了HDFS集群的根目录下。
执行以下命令创建数据库hdfs和HDFS表。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
说明本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的
192.168.**.**
为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。
执行以下命令将数据导入product.orders_all表中。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs.orders;
执行以下命令检查数据一致性。
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
通过HDFS表函数导入数据
ClickHouse的hdfs函数能够从指定HDFS地址读取文件数据,返回指定结构的表,语法如下:
hdfs(uri, format, structure);
参数 | 描述 |
| HDFS上文件的地址。 |
| 文件的类型。 |
| 表中字段的类型。例如,column1 UInt32,column2 String。 |
其中uri不能为目录地址,且文件所属的目录需要存在,否则写数据时会报错。
下载并上传示例数据orders.csv至HDFS集群的目录下,本文将文件上传到了HDFS集群的根目录下。
执行以下命令将数据导入product.orders_all表中。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
说明本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的
192.168.**.**
为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。执行以下命令检查数据一致性。
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
ClickHouse集群数据导出至HDFS
步骤一:创建业务表
本文中导出操作使用的业务表结构与导入操作的业务表结构相同,具体创建操作可查看步骤一:创建业务表。
步骤二:数据准备
执行以下命令向product.orders_all业务表中插入数据,为后续导出操作准备数据。
INSERT INTO product.orders_all VALUES (60333391,'2021-08-04 11:26:01',49358700,89) (38826285,'2021-08-03 10:47:29',25166907,27) (10793515,'2021-07-31 02:10:31',95584454,68) (70246093,'2021-08-01 00:00:08',82355887,97) (70149691,'2021-08-02 12:35:45',68748652,1) (87307646,'2021-08-03 19:45:23',16898681,71) (61694574,'2021-08-04 23:23:32',79494853,35) (61337789,'2021-08-02 07:10:42',23792355,55) (66879038,'2021-08-01 16:13:19',95820038,89);
(可选)设置导出方式,EMR-5.8.0及之后、EMR-3.45.0及之后版本可通过设置写入方式来避免路径上文件已存在的问题。
增量导出
设置后若文件已存在会在对应目录下新建文件并存放数据。
set hdfs_create_new_file_on_insert=1
覆盖导出
设置后若文件已存在会覆盖原有数据,请谨慎设置。
set hdfs_truncate_on_insert=1
步骤三:导出数据
通过HDFS表引擎导出数据
执行以下命令创建HDFS表。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
说明本文示例将数据导出至HDFS集群的根目录下。代码中的
192.168.**.**
为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。执行以下命令导出数据,数据通过HDFS表引擎存放到相应地址。
INSERT INTO hdfs.orders SELECT uid, date, skuId, order_revenue FROM product.orders_all;
说明ClickHouse在数据导出时会在相应地址上创建文件并写入数据,默认方式在文件已存在情况下导出失败。EMR-5.8.0、EMR-3.45.0之后的版本可通过配置参数来避免此问题。
执行以下命令,可以检查数据一致性。
SELECT a.* FROM hdfs.orders RIGHT ANTI JOIN product.orders_all a USING uid;
通过HDFS表函数导出数据
执行以下命令导出数据。
INSERT INTO FUNCTION hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all;
说明ClickHouse在数据导出时会在相应地址上创建文件并写入数据,默认方式在文件已存在情况下导出失败。EMR-5.8.0、EMR-3.45.0之后的版本可通过配置参数来避免此问题。
执行以下命令可以检查数据一致性。
SELECT
a.*
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
product.orders_all a
USING uid;
配置
EMR ClickHouse允许使用对HDFS进行配置:
全局生效的HDFS配置。
<hdfs> <dfs_default_replica>3</dfs_default_replica> </hdfs>
HDFS参数的详细信息,请参见官网文档HDFS Configuration Reference。
说明查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数
dfs_default_replica
,则可以在官网文档中搜索dfs.default.replica
。仅对${user}用户生效的HDFS配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置。
<hdfs_${user}> <dfs_default_replica>3</dfs_default_replica> </hdfs_${user}>