从Kafka同步数据

本文为您介绍如何将消息队列Kafka的数据实时同步至云数据库ClickHouse

使用限制

仅支持同步云消息队列Kafka和部署在ECS上的自建Kafka的数据。

前提条件

  • 云数据库ClickHouse

    • 已创建目标集群,且保证消息队列Kafka和目标集群在同一地域并使用相同的VPC。如何创建,请参见新建集群

    • 目标集群已创建登录数据库的账号且已具备数据库的操作权限。具体操作,请参见账号管理

  • 云消息队列Kafka:

注意事项

  • 云数据库ClickHouseKafka外表所订阅的Topic不能存在其他消费者。

  • 创建kafka外表、物化视图和本地表时,三张表的字段类型需要一一对应。

操作步骤

本示例为将云消息队列Kafka同步至云数据库ClickHouse社区兼容集群的数据库default库中的kafka_table_distributed分布式表中。

步骤一:了解同步原理

云数据库ClickHouse同步Kafka数据主要依赖于其内置的Kafka表引擎和物化视图(Materialized View)机制,实现实时数据消费和存储。具体数据链路如下。

image
  • Kafka主题:需要同步的源数据。

  • 云数据库ClickHouseKafka外表(Kafka表引擎的表):从指定Kafka主题拉取源数据。

  • 物化视图:通过Kafka外表读取源数据并执行插入操作,将数据写到云数据库ClickHouse本地表。

  • 本地表:存储同步的数据。

步骤二:登录云数据库ClickHouse

如何登录,请参见通过DMS连接ClickHouse

步骤三:创建Kafka外表

云数据库ClickHouse同步Kafka数据依赖于其内置的Kafka表引擎从指定Kafka主题拉取源数据。此表有以下特点:

  • Kafka外表默认不能直接查询。

  • Kafka外表只是用来消费Kafka数据,没有真正地存储数据,需要通过物化视图将数据加工并插入到目标表中存储。

建表语法如下。

重要

Kafka外表字段格式须与Kafka数据类型一致。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port1,host:port2,host:port3',
    kafka_topic_list = 'topic_name1,topic_name2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_num_consumers = N,]
    [kafka_thread_per_consumer = 1,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_auto_offset_reset = N]

常用参数说明如下。

名称

是否必选

说明

kafka_broker_list

以英文逗号(,)分隔的Kafka的接入点地址列表。如何查看接入点,请参见查看接入点

  • 如果您使用阿里云消息队列Kafka,Clickhouse默认支持解析对应的云Kafka域名。

  • 如果您使用了自建Kafka,云数据库Clickhouse目前支持通过IP或者固定格式的自定义域名连接Kafka,支持的域名规则如下:

    1. 以.com结尾的域名。

    2. 以.local结尾,并包含kafka、mysqlrabbitmq任意一个关键词的域名。

kafka_topic_list

以英文逗号(,)分隔的Topic名称列表。如何查看Topic名称,请参见创建Topic

kafka_group_name

Kafka的消费组名称。更多信息,请参见创建Group

kafka_format

云数据库ClickHouse支持处理的消息体格式。

说明

云数据库ClickHouse支持的消息体格式,更多信息请参见Formats for Input and Output Data

kafka_row_delimiter

行分隔符,用于分隔不同的数据行。默认为“\n”,您也可以根据数据写入的实际分隔格式进行设置。

kafka_num_consumers

单个表的消费者数量,默认值为1。

说明
  1. 一个消费者的吞吐量不足时,需要指定更多的消费者。

  2. 消费者的总数不应超过Topic中的分区数,因为每个分区只能分配一个消费者。

kafka_thread_per_consumer

指定每个消费者是否启动独立线程进行消费,默认值为0。取值说明如下。

  1. 0:表示所有消费者共同使用1个线程消费。

  2. 1:表示每个消费者启动独立线程进行消费。

提升消费速度更多信息请参见Kafka性能调优

kafka_max_block_size

Kafka消息的最大批次大小,单位:Byte,默认值为65536。

kafka_skip_broken_messages

Kafka消息解析器对于脏数据的容忍度,默认值为0。如果kafka_skip_broken_messages=N,则引擎将跳过N条无法解析的Kafka消息(一条消息等于一行数据)。

kafka_commit_every_batch

执行Kafka Commit的频率,默认值为0,取值说明如下:

  1. 0:完全写入一整个Block数据块的数据后才执行Commit。

  2. 1:每写完一个Batch批次的数据就执行一次Commit。

kafka_auto_offset_reset

消息的偏移量,从哪个offset开始读取Kafka数据,取值说明如下:

  1. earliest(默认值):从最早的offset开始读取Kafka数据。

  2. latest:从最晚的offset开始读取Kafka数据。

说明

21.8版本的云数据库ClickHouse集群不支持该参数。

更多参数说明,请参见Kafka

示例语句如下。

CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- 定义表结构的字段
    id Int32,
    name String               
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
    kafka_topic_list = 'testforCK',
    kafka_group_name = 'GroupForTestCK',
    kafka_format = 'CSV';

步骤四:创建目标存储表

您需根据集群版本选择建表语句。

企业版集群仅需创建本地表,社区兼容版集群则可能需要根据您的环境和需求创建分布式表。以下为示例语句,更多建表语法,请参见CREATE TABLE

企业版

CREATE TABLE default.kafka_table_local ON CLUSTER default (
  id Int32,
  name String
) ENGINE = MergeTree()
ORDER BY (id);

如果您执行此语句时报ON CLUSTER is not allowed for Replicated database的错误提示,可尝试通过升级版本解决此问题,如何升级版本,请参见升级内核小版本

社区兼容版

单副本和双副本的引擎有所不同,请根据您的副本类型选择相应的引擎。

重要

在双副本集群中建表时,必须使用MergeTree系列引擎中支持数据复制的Replicated系列引擎。如果您在双副本集群中,创建了非Replicated系列引擎的表,将导致副本之间无法进行数据复制,从而导致副本数据可能不一致。

单副本

  1. 创建本地表。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = MergeTree()
    ORDER BY (id);
  2. (可选)创建分布式表。

    如果您只需将文件导入至本地表中,可跳过此步骤。

    如果您的集群为多节点集群,建议您创建分布式表。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

双副本

  1. 创建本地表。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = ReplicatedMergeTree()
    ORDER BY (id);
  2. (可选)创建分布式表。

    如果您只需将文件导入至本地表中,可跳过此步骤。

    如果您的集群为多节点集群,建议您创建分布式表。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

步骤五:创建物化视图

云数据库ClickHouse同步数据依赖于物化视图通过Kafka外表读取源数据并触发插入操作,将数据写入到云数据库ClickHouse本地表。

创建物化视图语法如下。

重要

您需确保SELECT字段与目标表结构一致,或通过转换函数处理数据格式使其与目标表一致。

CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;

参数说明如下。

参数名称

是否必填

描述

示例

view_name

视图名称。

consumer

dest_table

用于存储Kafka数据的目标表。

  • 社区兼容版集群:

    • 多节点集群,建议将数据导入至分布式表中。

    • 如果您同步的目标表是本地表,此处表为本地表。

  • 企业版集群:企业版没有分布式表,此处为本地表。

  • 社区兼容版示例:kafka_table_distributed

  • 企业版示例:kafka_table_local

src_table

kafka外表。

kafka_src_table

示例语句如下。

企业版

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;

社区兼容版

此处示例时将源数据存储至kafka_table_distributed分布式表中。

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

步骤六:验证同步是否生效

  1. 在云消息队列Kafka版的Topic端发送消息。

    1. 登录消息队列Kafka控制台

    2. 实例列表页面,单击目标实例名称。

    3. Topic管理页面,单击目标Topic操作列的更多 > 体验发送消息

    4. 快速体验消息收发页面,输入发送的消息内容

      本文以发送消息1,a2,b为例。

    5. 单击确定

  2. 登录云数据库ClickHouse,查询分布式表,确认数据是否同步成功。

    如何登录云数据库ClickHouse,请参见通过DMS连接ClickHouse

    验证数据的查询语句如下。

    企业版

    SELECT * FROM kafka_table_local; 

    社区兼容版

    以下是查询分布式表的示例。

    • 如果您同步的目的表是本地表,需将查询语句中的分布式表名更换为本地表名,再进行查询。

    • 如果您是社区版集群,并且是多节点集群,强烈建议您查询分布式表;若未查询分布式表,则只能获取集群中一个节点的数据,这可能会导致查询结果少于您导入的数据。

    SELECT * FROM kafka_table_distributed; 

    当您执行查询语句并成功返回结果时,说明数据已从Kafka同步至云数据库ClickHouse

    查询结果如下。

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘

    如果查询结果与您期望的不同,您可通过步骤七(可选):查看Kafka外表的消费状态进一步排查问题。

步骤七(可选):查看Kafka外表的消费状态

如果您同步的数据与Kafka的数据不一致时,可通过系统表帮助您查看Kafka外表的消费状态,排查消息消费异常的问题。

内核版本大于等于23.8的社区兼容版集群及企业版集群

通过查询系统表system.kafka_consumers查看Kafka外表的消费状态,查询语句如下。

select * from system.kafka_consumers;

system.kafka_consumers表字段说明如下。

字段名称

说明

database

使用Kafka外表所在的数据库。

table

使用Kafka外表的表名。

consumer_id

Kafka消费者标识符。

一个表可以有多个消费者,由创建Kafka外表时的kafka_num_consumers参数指定。

assignments.topic

Kafka主题。

assignments.partition_id

Kafka分区ID。

一个分区只能分配给一个消费者。

assignments.current_offset

当前偏移量。

exceptions.time

最近10个异常生成的时间戳。

exceptions.text

最近10个异常的文本。

last_poll_time

最近一次轮询的时间戳。

num_messages_read

消费者读取的消息数量。

last_commit_time

最近一次提交的时间戳。

num_commits

消费者的总提交次数。

last_rebalance_time

最近一次Kafka重新平衡的时间戳。

num_rebalance_revocations

消费者被撤销分区的次数。

num_rebalance_assignments

消费者被分配到Kafka集群的次数。

is_currently_used

消费者是否正在使用中。

last_used

该消费者最后一次使用的时间,以微秒为单位的Unix时间。

rdkafka_stat

库内部统计信息。更多详情,请参见librdkafka

默认为3000,表示每3秒生成一次统计信息。

说明

云数据库ClickHouse设置 statistics_interval_ms=0时,可禁止Kafka外表的统计信息收集。

内核版本小于23.8以下的社区兼容版集群

通过查询系统表system.kafka查看Kafka外表的消费状态,查询语句如下。

SELECT * FROM system.kafka;

system.kafka表字段说明如下。

字段名称

说明

database

Kafka外表的数据库名称。

table

Kafka外表的表名。

topic

Kafka外表消费的topic名称。

consumer_group

Kafka外表消费的group名称。

last_read_message_count

拉取到的Kafka外表的消费消息的数量。

status

Kafka外表的消费Kafka消息的状态。取值说明:

  • no_view:Kafka外表没有创建视图。

  • attach_view:Kafka外表创建了视图。

  • normal:正常状态。

    kafka外表有消费数据时,Kafka外表的消费状态为normal

  • skip_parse:跳过错误解析。

  • error:消费异常。

exception

异常详情。

说明

status取值为error时,该参数返回异常详情。

常见问题