本文为您介绍如何将消息队列Kafka的数据实时同步至云数据库ClickHouse。
使用限制
仅支持同步云消息队列Kafka和部署在ECS上的自建Kafka的数据。
前提条件
注意事项
云数据库ClickHouse的Kafka外表所订阅的Topic不能存在其他消费者。
创建kafka外表、物化视图和本地表时,三张表的字段类型需要一一对应。
操作步骤
本示例为将云消息队列Kafka同步至云数据库ClickHouse社区兼容集群的数据库default库中的kafka_table_distributed分布式表中。
步骤一:了解同步原理
云数据库ClickHouse同步Kafka数据主要依赖于其内置的Kafka表引擎和物化视图(Materialized View)机制,实现实时数据消费和存储。具体数据链路如下。
Kafka主题:需要同步的源数据。
云数据库ClickHouse的Kafka外表(Kafka表引擎的表):从指定Kafka主题拉取源数据。
物化视图:通过Kafka外表读取源数据并执行插入操作,将数据写到云数据库ClickHouse本地表。
本地表:存储同步的数据。
步骤二:登录云数据库ClickHouse
如何登录,请参见通过DMS连接ClickHouse。
步骤三:创建Kafka外表
云数据库ClickHouse同步Kafka数据依赖于其内置的Kafka表引擎从指定Kafka主题拉取源数据。此表有以下特点:
Kafka外表默认不能直接查询。
Kafka外表只是用来消费Kafka数据,没有真正地存储数据,需要通过物化视图将数据加工并插入到目标表中存储。
建表语法如下。
Kafka外表字段格式须与Kafka数据类型一致。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]
常用参数说明如下。
名称 | 是否必选 | 说明 |
kafka_broker_list | 是 | 以英文逗号(,)分隔的Kafka的接入点地址列表。如何查看接入点,请参见查看接入点。
|
kafka_topic_list | 是 | 以英文逗号(,)分隔的Topic名称列表。如何查看Topic名称,请参见创建Topic。 |
kafka_group_name | 是 | Kafka的消费组名称。更多信息,请参见创建Group。 |
kafka_format | 是 | 云数据库ClickHouse支持处理的消息体格式。 说明 云数据库ClickHouse支持的消息体格式,更多信息请参见Formats for Input and Output Data。 |
kafka_row_delimiter | 否 | 行分隔符,用于分隔不同的数据行。默认为“\n”,您也可以根据数据写入的实际分隔格式进行设置。 |
kafka_num_consumers | 否 | 单个表的消费者数量,默认值为1。 说明
|
kafka_thread_per_consumer | 否 | 指定每个消费者是否启动独立线程进行消费,默认值为0。取值说明如下。
提升消费速度更多信息请参见Kafka性能调优。 |
kafka_max_block_size | 否 | Kafka消息的最大批次大小,单位:Byte,默认值为65536。 |
kafka_skip_broken_messages | 否 | Kafka消息解析器对于脏数据的容忍度,默认值为0。如果 |
kafka_commit_every_batch | 否 | 执行Kafka Commit的频率,默认值为0,取值说明如下:
|
kafka_auto_offset_reset | 否 | 消息的偏移量,从哪个offset开始读取Kafka数据,取值说明如下:
说明 21.8版本的云数据库ClickHouse集群不支持该参数。 |
更多参数说明,请参见Kafka。
示例语句如下。
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- 定义表结构的字段
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';
步骤四:创建目标存储表
您需根据集群版本选择建表语句。
企业版集群仅需创建本地表,社区兼容版集群则可能需要根据您的环境和需求创建分布式表。以下为示例语句,更多建表语法,请参见CREATE TABLE。
企业版
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);
如果您执行此语句时报ON CLUSTER is not allowed for Replicated database
的错误提示,可尝试通过升级版本解决此问题,如何升级版本,请参见升级内核小版本。
社区兼容版
单副本和双副本的引擎有所不同,请根据您的副本类型选择相应的引擎。
在双副本集群中建表时,必须使用MergeTree系列引擎中支持数据复制的Replicated系列引擎。如果您在双副本集群中,创建了非Replicated系列引擎的表,将导致副本之间无法进行数据复制,从而导致副本数据可能不一致。
单副本
创建本地表。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);
(可选)创建分布式表。
如果您只需将文件导入至本地表中,可跳过此步骤。
如果您的集群为多节点集群,建议您创建分布式表。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
双副本
创建本地表。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);
(可选)创建分布式表。
如果您只需将文件导入至本地表中,可跳过此步骤。
如果您的集群为多节点集群,建议您创建分布式表。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
步骤五:创建物化视图
云数据库ClickHouse同步数据依赖于物化视图通过Kafka外表读取源数据并触发插入操作,将数据写入到云数据库ClickHouse本地表。
创建物化视图语法如下。
您需确保SELECT字段与目标表结构一致,或通过转换函数处理数据格式使其与目标表一致。
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;
参数说明如下。
参数名称 | 是否必填 | 描述 | 示例 |
view_name | 是 | 视图名称。 | consumer |
dest_table | 是 | 用于存储Kafka数据的目标表。
|
|
src_table | 是 | kafka外表。 | kafka_src_table |
示例语句如下。
企业版
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;
社区兼容版
此处示例时将源数据存储至kafka_table_distributed分布式表中。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
步骤六:验证同步是否生效
在云消息队列Kafka版的Topic端发送消息。
登录消息队列Kafka控制台。
在实例列表页面,单击目标实例名称。
在Topic管理页面,单击目标Topic操作列的
。在快速体验消息收发页面,输入发送的消息内容。
本文以发送消息
1,a
和2,b
为例。单击确定。
登录云数据库ClickHouse,查询分布式表,确认数据是否同步成功。
如何登录云数据库ClickHouse,请参见通过DMS连接ClickHouse。
验证数据的查询语句如下。
企业版
SELECT * FROM kafka_table_local;
社区兼容版
以下是查询分布式表的示例。
如果您同步的目的表是本地表,需将查询语句中的分布式表名更换为本地表名,再进行查询。
如果您是社区版集群,并且是多节点集群,强烈建议您查询分布式表;若未查询分布式表,则只能获取集群中一个节点的数据,这可能会导致查询结果少于您导入的数据。
SELECT * FROM kafka_table_distributed;
当您执行查询语句并成功返回结果时,说明数据已从Kafka同步至云数据库ClickHouse。
查询结果如下。
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘
如果查询结果与您期望的不同,您可通过步骤七(可选):查看Kafka外表的消费状态进一步排查问题。
步骤七(可选):查看Kafka外表的消费状态
如果您同步的数据与Kafka的数据不一致时,可通过系统表帮助您查看Kafka外表的消费状态,排查消息消费异常的问题。
内核版本大于等于23.8的社区兼容版集群及企业版集群
通过查询系统表system.kafka_consumers
查看Kafka外表的消费状态,查询语句如下。
select * from system.kafka_consumers;
system.kafka_consumers
表字段说明如下。
字段名称 | 说明 |
database | 使用Kafka外表所在的数据库。 |
table | 使用Kafka外表的表名。 |
consumer_id | Kafka消费者标识符。 一个表可以有多个消费者,由创建Kafka外表时的kafka_num_consumers参数指定。 |
assignments.topic | Kafka主题。 |
assignments.partition_id | Kafka分区ID。 一个分区只能分配给一个消费者。 |
assignments.current_offset | 当前偏移量。 |
exceptions.time | 最近10个异常生成的时间戳。 |
exceptions.text | 最近10个异常的文本。 |
last_poll_time | 最近一次轮询的时间戳。 |
num_messages_read | 消费者读取的消息数量。 |
last_commit_time | 最近一次提交的时间戳。 |
num_commits | 消费者的总提交次数。 |
last_rebalance_time | 最近一次Kafka重新平衡的时间戳。 |
num_rebalance_revocations | 消费者被撤销分区的次数。 |
num_rebalance_assignments | 消费者被分配到Kafka集群的次数。 |
is_currently_used | 消费者是否正在使用中。 |
last_used | 该消费者最后一次使用的时间,以微秒为单位的Unix时间。 |
rdkafka_stat | 库内部统计信息。更多详情,请参见librdkafka。 默认为3000,表示每3秒生成一次统计信息。 说明 当云数据库ClickHouse设置 statistics_interval_ms=0时,可禁止Kafka外表的统计信息收集。 |
内核版本小于23.8以下的社区兼容版集群
通过查询系统表system.kafka
查看Kafka外表的消费状态,查询语句如下。
SELECT * FROM system.kafka;
system.kafka
表字段说明如下。
字段名称 | 说明 |
database | Kafka外表的数据库名称。 |
table | Kafka外表的表名。 |
topic | Kafka外表消费的topic名称。 |
consumer_group | Kafka外表消费的group名称。 |
last_read_message_count | 拉取到的Kafka外表的消费消息的数量。 |
status | Kafka外表的消费Kafka消息的状态。取值说明:
|
exception | 异常详情。 说明 当status取值为error时,该参数返回异常详情。 |