您可以通过Kafka表引擎导入数据至ClickHouse集群。本文为您介绍如何将Kafka中的数据导入至ClickHouse集群。

前提条件

使用限制

Kafka集群和ClickHouse集群需要在同一VPC下。

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host1:port1,host2:port2',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format';
其中,涉及参数描述如下表所示。
参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
kafka_broker_list Kafka Broker的地址及端口。

Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。

kafka_topic_list 订阅的Topic名称。
kafka_group_name Kafka consumer的分组名称。
kafka_format 数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data

示例

  1. 在ClickHouse集群中执行以下操作。
    1. 使用SSH方式登录ClickHouse集群,详情请参见登录集群
    2. 执行如下命令,进入ClickHouse客户端。
      clickhouse-client
    3. 执行如下命令,创建数据库kafka。
      CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
      说明 数据库名您可以自定义。本文示例中的cluster_emr是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。
    4. 执行如下命令,创建Kafka表。
      CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = Kafka()
      SETTINGS
        kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
        kafka_topic_list = 'clickhouse_test',
        kafka_group_name = 'clickhouse_test',
        kafka_format = 'CSV';
      kafka_broker_list为Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。其余参数含义请参见语法Intranet IP address
    5. 执行如下命令,创建数据库product。
      CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    6. 执行以下命令,创建本地表。
      CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
      PARTITION BY toYYYYMMDD(date)
      ORDER BY toYYYYMMDD(date);
    7. 执行以下命令,创建分布式表。
      CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = Distributed(cluster_emr, product, orders, rand());
    8. 执行以下命令,创建MATERIALIZED VIEW自动导数据。
      CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
      SELECT *
      FROM kafka.consumer;
  2. 在Kafka集群中执行以下操作。
    1. 使用SSH方式登录Kafka集群,详情请参见登录集群
    2. 在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。
      /usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
    3. 执行以下命令,输入测试数据。
      38826285,2021-08-03 10:47:29,25166907,27
      10793515,2021-07-31 02:10:31,95584454,68
      70246093,2021-08-01 00:00:08,82355887,97
      70149691,2021-08-02 12:35:45,68748652,1
      87307646,2021-08-03 19:45:23,16898681,71
      61694574,2021-08-04 23:23:32,79494853,35
      61337789,2021-08-02 07:10:42,23792355,55
      66879038,2021-08-01 16:13:19,95820038,89
      Data
  3. 在ClickHouse命令窗口中,执行以下命令,可以查看从Kafka中导入至ClickHouse集群的数据。

    您可以校验查询到的数据与源数据是否一致。

    SELECT * FROM product.orders_all;
    Result_click