从Kafka同步数据

当您需要将消息队列Kafka的数据实时同步至云数据库ClickHouse中时,本文档为您提供了详细的解决方案,以满足您的数据实时处理需求。

说明

云数据库ClickHouse集群从Kafka进行数据同步目前仅支持云消息队列Kafka和部署在ECS上的自建Kafka。本文以云消息队列Kafka进行数据同步为例。

前提条件

  • 已创建目标云数据库ClickHouse集群,且保证消息队列Kafka和目标云数据库ClickHouse集群在同一地域并使用相同的VPC。如何创建,请参见新建集群

  • 目标云数据库ClickHouse集群已创建登录数据库的账号且已具备数据库的操作权限。如何创建,请参见创建账号准备权限

操作步骤

  1. 登录云数据库ClickHouse控制台

  2. 在页面左上角,选择目标集群所在的地域。

  3. 集群列表页面,选择目标集群对应类型的实例列表,单击目标集群ID。

  4. 集群信息页面,单击右上方导航栏的登录数据库

  5. 在DMS数据管理服务控制台的登录实例页面,输入数据库账号密码,单击登录

  6. 创建Kafka消费表。

    说明
    • 创建Kafka消费表是为了将Kafka消费表的数据同步到云数据库ClickHouse的表中。

    • Kafka消费表不能直接使用。

    • Kafka消费表只是用来消费Kafka数据,没有真正地存储数据,最终消费数据需要实现消息批处理后进行消费。

    建表语法如下。

    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port1,host:port2,host:port3',
        kafka_topic_list = 'topic_name1,topic_name2,...',
        kafka_group_name = 'group_name',
        kafka_format = 'data_format'[,]
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_num_consumers = N,]
        [kafka_thread_per_consume = 1,]
        [kafka_max_block_size = 0,]
        [kafka_skip_broken_messages = N,]
        [kafka_commit_every_batch = 0,]
        [kafka_auto_offset_reset = N]

    常用参数说明如下。

    名称

    是否必选

    说明

    kafka_broker_list

    以英文逗号(,)分隔的Kafka的接入点地址列表。如何查看接入点,请参见查看接入点

    kafka_topic_list

    以英文逗号(,)分隔的Topic名称列表。如何查看Topic名称,请参见创建Topic

    kafka_group_name

    Kafka的消费组名称。更多信息,请参见创建Group

    kafka_format

    云数据库ClickHouse支持处理的消息体格式。

    说明

    云数据库ClickHouse支持的消息体格式,更多信息请参见Formats for Input and Output Data

    kafka_row_delimiter

    行分隔符,用于分隔不同的数据行。默认为“\n”,您也可以根据数据写入的实际分隔格式进行设置。

    kafka_num_consumers

    单个表的消费者数量,默认值为1。

    说明
    • 一个消费者的吞吐量不足时,需要指定更多的消费者。

    • 消费者的总数不应超过Topic中的分区数,因为每个分区只能分配一个消费者。

    kafka_thread_per_consumer

    指定每个消费者是否启动独立线程进行消费,默认值为0。取值说明如下。

    • 0:表示所有消费者共同使用1个线程消费。

    • 1:表示每个消费者启动独立线程进行消费。

    提升消费速度更多信息请参见Kafka性能调优

    kafka_max_block_size

    Kafka消息的最大批次大小,单位:Byte,默认值为65536。

    kafka_skip_broken_messages

    Kafka消息解析器对于脏数据的容忍度,默认值为0。如果kafka_skip_broken_messages=N,则引擎将跳过N条无法解析的Kafka消息(一条消息等于一行数据)。

    kafka_commit_every_batch

    执行Kafka Commit的频率,默认值为0,取值说明如下:

    • 0:完全写入一整个Block数据块的数据后才执行Commit。

    • 1:每写完一个Batch批次的数据就执行一次Commit。

    kafka_auto_offset_reset

    消息的偏移量,从哪个offset开始读取Kafka数据,取值说明如下:

    • earliest(默认值):从最早的offset开始读取Kafka数据。

    • latest:从最晚的offset开始读取Kafka数据。

    说明

    21.8版本的云数据库ClickHouse集群不支持该参数。

    说明

    更多参数说明,请参见Kafka

    示例语句如下。

    CREATE TABLE default.kafka_src_table ON CLUSTER default
    (   --定义表结构的字段
        id Int32,
        name String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'CSV';
  7. 创建云数据库ClickHouse表。

    说明

    创建本地表和分布式表的目的。更多信息,请参见基本概念

    1. 创建本地表。

      CREATE TABLE default.kafka_table_local ON CLUSTER default (
        id Int32,
        name String
      ) ENGINE = MergeTree()
      ORDER BY (id);
    2. 创建分布式表。

      说明

      如果您只需要同步数据至本地表,可跳过此步骤。

      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
      ENGINE = Distributed(default, default, kafka_table_local, id);
  8. 创建视图将Kafka消费表的数据同步到云数据库ClickHouse的分布式表。

    说明

    如果您同步的目的表是本地表,请将分布式表名更换为本地表名,再进行同步。

    创建视图语法如下。

    CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];

    示例语句如下。

    CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

验证同步结果

您可以选择以下任意一种方式验证同步结果。

查询云数据库ClickHouse分布式表验证同步结果

  1. 在云消息队列Kafka版的Topic端发送消息。

    1. 登录消息队列Kafka控制台

    2. 实例列表页面,单击目标实例名称。

    3. Topic管理页面,单击目标Topic操作列的更多 > 体验发送消息

    4. 快速体验消息收发页面,输入发送的消息内容

      本文以发送消息1,a2,b为例。

    5. 单击确定

  2. 查询云数据库ClickHouse分布式表,确认数据是否同步成功,查询语句如下。

    SELECT * FROM kafka_table_distributed; 
    说明

    如果您同步的目的表是本地表,请将查询语句中的分布式表名更换为本地表名,再进行查询。

    查询结果如下。

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘
    说明

    当您执行查询语句并成功返回结果时,说明数据已从Kafka同步至云数据库ClickHouse

查询系统表验证同步结果

通过查询系统表system.kafka查看Kafka消费表的消费状态,查询语句如下。

SELECT * FROM system.kafka

查询结果如下。

┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│  default │  kafka_table_distributed    │ test  │   test         │          2              │   attach_view     │           │  
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘

查询结果说明如下。

名称

说明

database

Kafka消费表的数据库名称。

table

Kafka消费表的表名。

topic

Kafka消费表的topic名称。

consumer_group

Kafka消费表的group名称。

last_read_message_count

拉取到的Kafka消费表的消息数量。

status

Kafka消费表的消费状态。取值说明:

  • no_view:Kafka消费表没有创建视图。

  • attach_view:Kafka消费表创建了视图。

  • normal:正常状态。

    说明

    当kafka消费表有消费数据时,Kafka消费表的消费状态为normal

  • skip_parse:跳过错误解析。

  • error:消费异常。

exception

异常详情。

说明

status取值为error时,该参数返回异常详情。

常见问题

从Kafka同步数据到云数据库ClickHouse的常见问题及处理方法,请参见常见问题