HDFS与ClickHouse间的数据导入与导出

您可以通过HDFS表引擎或表函数读写数据。本文为您介绍如何将HDFS中的数据导入至ClickHouse集群以及如何从ClickHouse集群导出数据到HDFS。

前提条件

注意事项

本文代码示例中HDFS URL中的9000为非HA模式下NameNode的端口,如果使用的是HA模式下的NameNode,则端口通常为8020。

HDFS集群数据导入至ClickHouse集群

步骤一:创建业务表

  1. 使用SSH方式登录ClickHouse集群,详情请参见登录集群

  2. 执行以下命令,进入ClickHouse客户端。

    clickhouse-client -h core-1-1 -m
    说明

    本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。

  3. 执行以下命令,创建数据库product,并在product数据库中创建业务表orders。

    CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
    PARTITION BY toYYYYMMDD(date)
    ORDER BY toYYYYMMDD(date);
    CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = Distributed(cluster_emr, product, orders, rand());
    说明

    示例中的{shard}{replica}是阿里云EMR为ClickHouse集群自动生成的宏定义,可以直接使用。

步骤二:导入数据

通过HDFS表引擎导入数据

ClickHouse的HDFS表引擎能够从指定HDFS地址读取特定格式的文件数据,语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
  name1 [type1],
  name2 [type2],
  ...
)
Engine = HDFS(uri, format);

参数

描述

db

数据库名。

table_name

表名。

name1/name2

列名。

tyep1/type2

列的类型。

uri

HDFS上文件的地址。

format

文件的类型。

说明

其中uri不能为目录地址,且文件所属的目录需要存在,否则写数据时会报错。

  1. 创建HDFS引擎表并准备数据。

    1. 下载并上传示例数据orders.csv至HDFS集群的目录下,本文将文件上传到了HDFS集群的根目录下。

    2. 执行以下命令创建数据库hdfs和HDFS表。

      CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
      CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
      说明

      本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。

  2. 执行以下命令将数据导入product.orders_all表中。

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      hdfs.orders;
  3. 执行以下命令检查数据一致性。

    SELECT
      a.*
    FROM
      hdfs.orders a
    LEFT ANTI JOIN
      product.orders_all
    USING uid;

通过HDFS表函数导入数据

ClickHouse的hdfs函数能够从指定HDFS地址读取文件数据,返回指定结构的表,语法如下:

hdfs(uri, format, structure);

参数

描述

uri

HDFS上文件的地址。

format

文件的类型。

structure

表中字段的类型。例如,column1 UInt32,column2 String。

说明

其中uri不能为目录地址,且文件所属的目录需要存在,否则写数据时会报错。

  1. 下载并上传示例数据orders.csv至HDFS集群的目录下,本文将文件上传到了HDFS集群的根目录下。

  2. 执行以下命令将数据导入product.orders_all表中。

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
    说明

    本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。

  3. 执行以下命令检查数据一致性。

    SELECT
      a.*
    FROM
      hdfs.orders a
    LEFT ANTI JOIN
      product.orders_all
    USING uid;

ClickHouse集群数据导出至HDFS

步骤一:创建业务表

本文中导出操作使用的业务表结构与导入操作的业务表结构相同,具体创建操作可查看步骤一:创建业务表

步骤二:数据准备

  1. 执行以下命令向product.orders_all业务表中插入数据,为后续导出操作准备数据。

    INSERT INTO product.orders_all VALUES 
      (60333391,'2021-08-04 11:26:01',49358700,89) 
      (38826285,'2021-08-03 10:47:29',25166907,27) 
      (10793515,'2021-07-31 02:10:31',95584454,68) 
      (70246093,'2021-08-01 00:00:08',82355887,97) 
      (70149691,'2021-08-02 12:35:45',68748652,1)  
      (87307646,'2021-08-03 19:45:23',16898681,71) 
      (61694574,'2021-08-04 23:23:32',79494853,35) 
      (61337789,'2021-08-02 07:10:42',23792355,55) 
      (66879038,'2021-08-01 16:13:19',95820038,89);
  2. (可选)设置导出方式,EMR-5.8.0及之后、EMR-3.45.0及之后版本可通过设置写入方式来避免路径上文件已存在的问题。

    增量导出

    设置后若文件已存在会在对应目录下新建文件并存放数据。

    set hdfs_create_new_file_on_insert=1

    覆盖导出

    设置后若文件已存在会覆盖原有数据,请谨慎设置。

    set hdfs_truncate_on_insert=1

步骤三:导出数据

通过HDFS表引擎导出数据

  1. 执行以下命令创建HDFS表。

    CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
    说明

    本文示例将数据导出至HDFS集群的根目录下。代码中的192.168.**.**为HDFS集群的core-1-1节点的内网IP地址,您可以在EMR控制台的节点管理页签查看。

  2. 执行以下命令导出数据,数据通过HDFS表引擎存放到相应地址。

    INSERT INTO hdfs.orders
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
     product.orders_all;
    说明

    ClickHouse在数据导出时会在相应地址上创建文件并写入数据,默认方式在文件已存在情况下导出失败。EMR-5.8.0、EMR-3.45.0之后的版本可通过配置参数来避免此问题。

  3. 执行以下命令,可以检查数据一致性。

    SELECT
      a.*
    FROM
      hdfs.orders
    RIGHT ANTI JOIN
      product.orders_all a
    USING uid;

通过HDFS表函数导出数据

  1. 执行以下命令导出数据。

    INSERT INTO FUNCTION
      hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    说明

    ClickHouse在数据导出时会在相应地址上创建文件并写入数据,默认方式在文件已存在情况下导出失败。EMR-5.8.0、EMR-3.45.0之后的版本可通过配置参数来避免此问题。

  2. 执行以下命令可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
  product.orders_all a
USING uid;

配置

EMR ClickHouse允许使用对HDFS进行配置:

  • 全局生效的HDFS配置。

    <hdfs>
      <dfs_default_replica>3</dfs_default_replica>
    </hdfs>

    HDFS参数的详细信息,请参见官网文档HDFS Configuration Reference

    说明

    查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数dfs_default_replica,则可以在官网文档中搜索dfs.default.replica

  • 仅对${user}用户生效的HDFS配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置。

    <hdfs_${user}>
      <dfs_default_replica>3</dfs_default_replica>
    </hdfs_${user}>