本文介绍如何从消息队列Kafka版中同步数据至云数据库ClickHouse。
前提条件
操作步骤
- 登录云数据库ClickHouse控制台。
- 在页面左上角,选择目标集群所在的地域。
- 登录数据库。
如果您的集群是社区兼容版,请执行以下操作。
- 在集群列表页面,选择默认实例列表,单击目标集群ID。
- 在集群信息页面,单击右上方导航栏的登录数据库。
- 在DMS数据管理服务控制台的登录实例页面,输入数据库账号和密码,单击登录。
如果您的集群是云原生版,请执行以下操作。
- 在集群列表页面,选择云原生版本实例列表,单击目标集群ID。
- 在集群信息页面,单击集群计算组操作列的登录。
- 在DMS数据管理服务控制台的登录实例页面,输入数据库账号和密码,单击登录。
- 创建Kafka消费表。
说明
- 创建Kafka消费表的目的是为了将Kafka消费表的数据同步到云数据库ClickHouse的表中。
- Kafka消费表不能直接使用。
- Kafka消费表只是用来消费Kafka数据,没有真正地存储数据。
建表语法如下。CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port1,host:port2,host:port3', kafka_topic_list = 'topic_name1,topic_name2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_ auto_offset_reset = N]
常用参数说明如下。名称 是否必选 说明 kafka_broker_list 是 以英文逗号(,)分隔的Kafka的接入点地址列表。如何查看接入点,请参见查看接入点。 kafka_topic_list 是 以英文逗号(,)分隔的Topic名称列表。如何查看Topic名称,请参见创建Topic。 kafka_group_name 是 Kafka的消费组名称。更多信息,请参见创建Group。 kafka_format 是 云数据库ClickHouse支持处理的消息体格式。 说明 云数据库ClickHouse支持的消息体格式,具体请参见 Formats for Input and Output Data。kafka_row_delimiter 否 行分隔符,用于分隔不同的数据行。默认为“\n”,您也可以根据数据写入的实际分隔格式进行设置。 kafka_num_consumers 否 单个表的消费者数量,默认值为1。 说明- 一个消费者的吞吐量不足时,需要指定更多的消费者。
- 消费者的总数不应超过Topic中的分区数,因为每个分区只能分配一个消费者。
kafka_max_block_size 否 Kafka消息的最大批次大小,单位:Byte,默认值为65536。 kafka_skip_broken_messages 否 kafka消息解析器对于脏数据的容忍度,默认值为0。如果 kafka_skip_broken_messages=N
,则引擎将跳过N条无法解析的Kafka消息(一条消息等于一行数据)。kafka_commit_every_batch 否 执行Kafka commit的频率,取值说明如下: - 0(默认值):完全写入一整个Block数据块的数据后才执行commit。
- 1:每写完一个Batch批次的数据就执行一次commit。
kafka_auto_offset_reset 否 消息的偏移量,从哪个offset开始读取Kafka数据,取值说明如下: - earliest(默认值):从最早的offset开始读取Kafka数据。
- latest:从最晚的offset开始读取Kafka数据。
说明 21.8版本的 云数据库ClickHouse集群不支持该参数。说明 更多参数说明,请参见 Kafka。示例语句如下。
CREATE TABLE default.kafka_src_table ON CLUSTER default ( //定义表结构的字段 id Int32, name String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092', kafka_topic_list = 'test', kafka_group_name = 'test', kafka_format = 'CSV';
- 创建云数据库ClickHouse表。
说明 创建本地表和分布式表的目的。具体信息,请参见 基本概念。
- 创建视图将Kafka消费表的数据同步到云数据库ClickHouse的分布式表。
说明 如果您同步的目的表是本地表,请将分布式表名更换为本地表名,再进行同步。
创建视图语法如下。
CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];
示例语句如下。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
验证同步结果
您可以选择以下任意一种方式验证同步结果。
查询云数据库ClickHouse分布式表验证同步结果
- 在消息队列Kafka版的Topic端发送消息。
- 查询云数据库ClickHouse分布式表,确认数据是否同步成功,查询语句如下。
Select * from kafka_table_distributed;
说明 如果您同步的目的表是本地表,请将查询语句中的分布式表名更换为本地表名,再进行查询。查询结果如下。┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘
说明 当您执行查询语句并成功返回结果时,说明数据已从Kafka同步至 云数据库ClickHouse。
通过查询系统表验证同步结果
通过查询系统表
system.kafka
查看kafka消费表的消费状态,查询语句如下。
Select * from system.kafka
查询结果如下。
┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│ default │ kafka_table_distributed │ test │ test │ 2 │ attach_view │ │
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘
查询结果说明如下。
名称 | 说明 |
---|---|
database | kafka消费表的数据库名称。 |
table | kafka消费表的表名。 |
topic | kafka消费表的topic名称。 |
consumer_group | kafka消费表的group名称。 |
last_read_message_count | 拉取到的kafka消费表的消息数量。 |
status | kafka消费表的消费状态。取值说明:
|
exception | 异常详情。
说明 当
status取值为
error时,该参数返回异常详情。
|
常见问题
从Kafka同步数据到云数据库ClickHouse的常见问题及处理方法,请参见常见问题。