本节介绍如何从阿里云消息队列服务Kafka中导入数据。

前提条件

数据源Kafka集群和目标ClickHouse集群必须在同一个VPC下才能保证导入成功。

操作步骤

  1. 在ClickHouse集群中新建Kafka消费表。
    CREATE TABLE default.kafka_src_table ON CLUSTER default
    (   //定义表结构的字段
        id Int32,
        age Int32,
        msg String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = '*',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'JSONEachRow';
    参数说明
    • kafka_broker_list:对应的Kafka集群地址,可以在Kafka控制台单击目标Kafka集群的实例详情,在实例详情页找到默认接入点复制即可。
    • kafka_topic_list:对应消费的topic。
    • kafka_group_name:消费topic的group,需要先在Kafka控制台中创建。请参见步骤二:创建Consumer Group
    • kafka_format:ClickHouse可以处理的数据类型。JSONEachRow表示每行一条数据的json格式。一般如果是json格式的话,设置JSONEachRow即可。如果需要输入嵌套的json,请设置input_format_import_nested_json=1。

    关于ClickHouse支持的各种格式,可以参考官网:Formats for Input and Output Data

    更多属性设置,可以参考:ClickHouse集成kafka

  2. 创建ClickHouse目的表。
    1. 创建本地表。
      create table default.kafka_table_local ON CLUSTER default (
        id Int32,
        age UInt32,
        msg String
      ) ENGINE = ReplicatedMergeTree(
          '/clickhouse/tables/kafka_sink_table/{shard}',
          '{replica}')
          ORDER BY (id);
    2. 创建分布式表。
      CREATE TABLE kafka_table_distributed ON CLUSTER default
      ENGINE = Distributed(default, default, kafka_table_local, id);
  3. 创建view把Kafka消费表消费到的数据导入ClickHouse目的表。
    CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
    说明 Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。