本节介绍如何从阿里云消息队列服务Kafka中导入数据。

操作步骤

  1. 连接到ClickHouse集群,详情请参见连接集群
  2. 在ClickHouse集群中为Kafka建立源头表。
    建表语法如下所示。
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port',
        kafka_topic_list = 'topic1,topic2,...',
        kafka_group_name = 'group_name',
        kafka_format = 'data_format'[,]
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_schema = '',]
        [kafka_num_consumers = N,]
        [kafka_skip_broken_messages = N]
    参数说明如下:
    参数 说明 是否必选
    kafka_broker_list Kafka broker列表
    kafka_topic_list Kafka topic列表
    kafka_group_name Kafka消费者组
    kafka_format 消息格式
    kafka_row_delimiter 一行的结束标识符号
    kafka_schema 如果kafka_format参数需要schema定义,则通过该参数来支持
    kafka_num_consumers 每张表的消费者个数
    kafka_skip_broken_messages 忽略无效记录的条数

    示例建表语句如下。

    CREATE TABLE kafka_src_table ON CLUSTER default
    (
        id Int32,
        age Int32,
        msg String,
        time Date
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = '<ip1>:<port1>,<ip2>:<port2>,<ip3>:<port3>',
        kafka_topic_list = 'test_kafka',
        kafka_group_name = 'test_kafka_cg',
        kafka_format = 'CSV';
  3. 在ClickHouse中创建目的表。
    --建立本地表
    create table default.kafka_sink_table ON CLUSTER default (
      id Int32,
      age UInt32,
      msg String,
      time Date
    ) ENGINE = ReplicatedMergeTree(
        '/clickhouse/tables/kafka_sink_table/{shard}',
        '{replica}',
        time,
        (id, age),
        8192);
    
    --建立分布式表
    CREATE TABLE kafka_sink_table_distributed ON CLUSTER default
     AS kafka_test_table
    ENGINE = Distributed(default, default, kafka_sink_table, rand());
  4. 从源头表导入数据到目的表中。
    insert into <dest_table> select * from <src_table>