您可以通过Kafka表引擎导入数据至ClickHouse集群。本文为您介绍如何将Kafka中的数据导入至ClickHouse集群。
前提条件
- 已创建DataFlow集群,且选择了Kafka服务,详情请参见创建集群。
- 已创建ClickHouse集群,详情请参见创建集群。
使用限制
DataFlow集群和ClickHouse集群需要在同一VPC下。
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host1:port1,host2:port2',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format';
其中,涉及参数描述如下表所示。
参数 |
描述 |
db |
数据库名。 |
table_name |
表名。 |
cluster |
集群标识。 |
name1/name2 |
列名。 |
tyep1/type2 |
列的类型。 |
kafka_broker_list |
Kafka Broker的地址及端口。
DataFlow集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。
|
kafka_topic_list |
订阅的Topic名称。 |
kafka_group_name |
Kafka consumer的分组名称。 |
kafka_format |
数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data。
|
示例
- 在ClickHouse集群中执行以下操作。
- 使用SSH方式登录ClickHouse集群,详情请参见登录集群。
- 执行如下命令,进入ClickHouse客户端。
clickhouse-client -h core-1-1 -m
说明 本示例登录core-1-1节点,如果您有多个Core节点,可以登录任意一个节点。
- 执行如下命令,创建数据库kafka。
CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
说明 数据库名您可以自定义。本文示例中的cluster_emr
是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。
- 执行如下命令,创建Kafka表。
CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
kafka_topic_list = 'clickhouse_test',
kafka_group_name = 'clickhouse_test',
kafka_format = 'CSV';
kafka_broker_list
为
DataFlow
集群所有节点的内网
IP
地址及端口,您可以在
EMR
控制台的
节点管理页面查看。其余参数含义请参见
语法。

- 执行如下命令,创建数据库product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
- 执行以下命令,创建本地表。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
- 执行以下命令,创建分布式表。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- 执行以下命令,创建MATERIALIZED VIEW自动导数据。
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
- 在DataFlow集群中执行以下操作。
- 使用SSH方式登录DataFlow集群,详情请参见登录集群。
- 在DataFlow集群的命令行窗口,执行如下命令运行Kafka的生产者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
- 执行以下命令,输入测试数据。
38826285,2021-08-03 10:47:29,25166907,27
10793515,2021-07-31 02:10:31,95584454,68
70246093,2021-08-01 00:00:08,82355887,97
70149691,2021-08-02 12:35:45,68748652,1
87307646,2021-08-03 19:45:23,16898681,71
61694574,2021-08-04 23:23:32,79494853,35
61337789,2021-08-02 07:10:42,23792355,55
66879038,2021-08-01 16:13:19,95820038,89
- 在ClickHouse命令窗口中,执行以下命令,可以查看从Kafka中导入至ClickHouse集群的数据。
您可以校验查询到的数据与源数据是否一致。
SELECT * FROM product.orders_all;
