ClickHouse自建上云兼容性和性能瓶颈分析与解决

更新时间:2025-02-24 06:55:32

ClickHouse迁移上云后,可能会面临兼容性及性能方面的问题。为了确保迁移过程顺利,并避免对生产环境造成不必要的影响,在您正式开始迁移数据前,强烈建议在测试环境进行数据迁移,然后进行上云后的兼容性和性能瓶颈分析,并解决相关问题。

背景知识

用户在业务运行的早期阶段自建了ClickHouse,后期考虑到稳定性、运维成本和容灾能力等因素,希望将ClickHouse直接迁移至云端。迁移至云端后,可能面临以下问题:

  • 版本兼容性问题。

    • MaterializedMySQL引擎兼容性问题。

    • SQL兼容性问题。

  • 业务整体切换至云数据库ClickHouse后,出现CPU资源耗尽、内存不足等问题。

针对上述问题,自建上云过程中需重点关注兼容性与性能问题的解决。

兼容性分析与解决

参数兼容性

  1. 拉取自建ClickHouse云数据库ClickHouse的配置参数。

    SELECT
        name,
        groupArrayDistinct(value) AS value
    FROM clusterAllReplicas(`default`, system.settings)
    GROUP BY name
    ORDER BY name ASC
  2. 利用文本对比工具(例如VSCode)对比二者的参数,发现配置不一致的参数后,将云数据库ClickHouse的参数配置与自建ClickHouse参数一致。

    • 影响兼容性的常见参数:compatibility、prefer_global_in_and_join、distributed_product_mode等。

    • 影响性能的常见参数:max_threads、max_bytes_to_merge_at_max_space_in_pool、prefer_global_in_and_join等。

    image.png

MaterializedMySQL兼容性

如果您的自建ClickHouse同步的是MySQL的数据,自建上云后,云数据库ClickHouse仍需持续同步MySQL的数据,您需注意MaterializedMySQL引擎的兼容性。

MaterializedMySQL引擎在每个节点创建ReplacingMergeTreeTABLE1,每个节点保留一样的数据。但社区版已经不再维护MaterializedMySQL引擎。由于MySQL数据同步至云数据库ClickHouse的主要方案是通过DTS同步,针对MaterializedMySQL引擎社区版不再维护问题,DTSMySQL数据同步至云数据库ClickHouse时,其使用ReplacingMergeTree表代替MaterializedMySQL表,具体实现是为云数据库ClickHouse的每个节点创建分布式表TABLE1ReplacingMergeTreeTABLE2,然后云数据库ClickHouse通过分布式表将数据分发到各自节点。此实现引发了一系列自建ClickHouse迁移至云数据库ClickHouse的兼容问题,可能对您的业务造成影响。常见问题如下:

  • 问题一:上云之前,在自建的ClickHouse中,MaterializedMySQL会为每个分片同步数据;而通过DTS上云后,使用ReplacingMergeTree表代替MaterializedMySQL表,数据会通过分布式表分发到各自节点,这对业务中使用INJOIN关联查询分布式表产生了影响。更多详情,请参见分布式表使用子查询报错后,怎么办?

  • 问题二:上云后,使用ReplacingMergeTree表替代MaterializedMySQL后,由于ReplacingMergeTree数据合并速度不够快,导致查询结果比自建查询结果出现更多的重复数据。可通过以下方案解决:

    方案一:在云数据库ClickHouse执行set global final=1,设置在查询时合并数据。此参数能保证查询数据不重复,但其会占用更多CPU和内存。

    方案二:在云数据库ClickHouse,修改目标ReplacingMergeTree表的min_age_to_force_merge_secondsmin_age_to_force_merge_on_partition_only参数,让目标表merge更频繁,防止产生太多重复数据。示例如下:

    ALTER TABLE <AIM_TABLE>
        MODIFY SETTING
            min_age_to_force_merge_on_partition_only = 1,
            min_age_to_force_merge_seconds = 60;
    • min_age_to_force_merge_on_partition_only

      含义:用于控制MergeTree表引擎的合并策略。当设置为1时,开启分区强制合并数据。

      默认值:0(表示关闭)

    • min_age_to_force_merge_seconds

      含义:强制合并part的时间间隔。

      默认值:3600

      单位:秒

SQL兼容性验证

  1. 安装Python环境。

    此验证需基本Python3环境。建议您用阿里云ECSLinux进行验证,因为其已具有Python3环境。如何购买ECS,请参见快速购买实例

    如果非阿里云ECS环境,您需自行安装Python环境,如何安装Python环境,请参见Python官网

  2. 安装与ClickHouse进行交互的Python客户端库。

    在终端或命令提示符中,执行以下命令。

    pip3 install clickhouse_driver
  3. 确保用于验证的服务器与云数据库ClickHouse和自建ClickHouse网络互通。

    如何解决用于验证的服务器与云数据库ClickHouse网络互通,请参见如何解决目标集群与数据源网络互通问题?

  4. 执行Python脚本,提取自建ClickHouseSELECT请求在云数据库ClickHouse运行,验证自建ClickHouseSQL云数据库ClickHouse的兼容性。脚本如下:

    from clickhouse_driver import connect
    import datetime
    import logging
    # 需要 pip3 install clickhouse_driver
    
    # 自建实例VPC地址
    host_old='HOST_OLD'
    # 自建实例TCP端口
    port_old=TCP_PORT_OLD
    # 自建实例用户名
    user_old='USER_OLD'
    # 自建实例用户密码
    password_old='PASSWORD_OLD'
    
    # 云ClickHouse VPC地址
    host_new='HOST_NEW'
    # 云ClickHouse TCP端口
    port_new=TCP_PORT_NEW
    # 云ClickHouse用户名
    user_new='USER_NEW'
    # 云ClickHouse用户密码
    password_new='PASSWORD_NEW'
    
    # 配置日志
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    def create_connection(host, port, user, password):
        """创建 ClickHouse 连接"""
        return connect(host=host, port=port, user=user, password=password)
    
    def get_query_hashes(cursor):
        """获取最近2天的 query_hash 列表"""
        get_queryhash_sql = '''
        select distinct normalized_query_hash from system.query_log
        where type='QueryFinish'
          and `is_initial_query`=1
          and `user` not in ('default', 'aurora')
          and lower(`query`) not like 'select 1%'
          and lower(`query`) not like 'select timezone()%'
          and lower(`query`) not like '%dms-websql%'
          and lower(`query`) like 'select%'
          and `event_time` > now() - INTERVAL 2 DAY;
        '''
        cursor.execute(get_queryhash_sql)
        return cursor.fetchall()
    
    def get_sql_info(cursor, queryhash):
        """获取最近2天(搜索范围3天)指定 query_hash 的 SQL 信息"""
        get_sqlinfo_sql = f'''
        select `event_time`, `query_duration_ms`, `read_rows`, `read_bytes`, `memory_usage`, `databases`, `tables`, `query`
        from system.query_log
        where `event_time` > now() - INTERVAL 3 DAY and `type`='QueryFinish' and `normalized_query_hash`='{queryhash}' limit 1
        '''
        cursor.execute(get_sqlinfo_sql)
        sql_info = cursor.fetchone()
        if sql_info:
            return [info.strftime('%Y-%m-%d %H:%M:%S') if isinstance(info, datetime.datetime) else info for info in sql_info[:-1]],sql_info[-1]
        return None
    
    def execute_sql_on_new_db(cursor, query_sql, execute_failed_sql):
        """在新节点上执行 SQL 并记录失败的 SQL"""
        try:
            cursor.execute(query_sql)
        except Exception as error:
            logging.error(f'query_sql execute in new db failed: {query_sql}')
            execute_failed_sql[query_sql] = error
    
    def main():
        # 创建连接
        conn_old = create_connection(
            host=host_old,
            port=port_old,
            user=user_old,
            password=password_old
        )
        conn_new = create_connection(
            host=host_new,
            port=port_new,
            user=user_new,
            password=password_new
        )
    
        # 创建游标
        cursor_old = conn_old.cursor()
        cursor_new = conn_new.cursor()
    
        # 获取老节点的 query_hash 列表
        old_query_hashes = get_query_hashes(cursor_old)
        #老节点获取 SQL 执行信息
        old_db_execute_dir = {}
        for queryhash in old_query_hashes:
            sql_info,query = get_sql_info(cursor_old, queryhash[0])
            if sql_info:
                old_db_execute_dir[query] = sql_info
    
        # 关闭老节点的游标和连接
        cursor_old.close()
        conn_old.close()
    
        # 在新节点上执行 SQL,最重要的验证步骤
        execute_failed_sql = {}
        keys_list = list(old_db_execute_dir.keys())
        for query_sql in old_db_execute_dir:
            position = keys_list.index(query_sql)
            logging.info(f"new db test the {position + 1}th/{len(old_db_execute_dir)}, running sql: {query_sql}\n")
            execute_sql_on_new_db(cursor_new, query_sql, execute_failed_sql)
    
        # 获取新节点的 query_hash 列表
        new_query_hashes = get_query_hashes(cursor_new)
        new_db_execute_dir = {}
        for queryhash in new_query_hashes:
            sql_info,query = get_sql_info(cursor_new, queryhash[0])
            if sql_info:
                new_db_execute_dir[query] = sql_info
    
        # 关闭新节点的游标和连接
        cursor_new.close()
        conn_new.close()
    
        # 打印新老版本节点 SQL 执行信息
        for query_sql in new_db_execute_dir:
            if query_sql in old_db_execute_dir:
                logging.info(f'succeed sql: {query_sql}')
                logging.info(f'old sql info: {old_db_execute_dir[query_sql]}')
                logging.info(f'new sql info: {new_db_execute_dir[query_sql]}\n')
    
    
        # 打印执行报错的 SQL
        for query_sql in execute_failed_sql:
            logging.error(f'failed sql: {query_sql}')
            logging.error(f'failed error: {execute_failed_sql[query_sql]}\n')
    
    if __name__ == "__main__":
        main()
  5. 验证结果与解决。

    验证的SQL兼容性情况较为复杂,如遇到错误,您需根据错误信息自行进行排查与解决。

性能分析和优化

由于自建上云背景是已经有业务在运行,较难实现逐步上云测试和性能优化。业务整体切换到云ClickHouse后,实例可能出现CPU打满,内存不足等问题。在出现切换到云数据库ClickHouse后,云数据库ClickHouse性能劣化时,通过以下措施快速定位到表或请求,进而确认性能瓶颈问题。当然,如果已经知道某个表或查询导致目标实例性能较差,可以直接进入步骤三:SQL性能分析

步骤一:定位整体性能瓶颈的关键表

火焰图与query_log分析是定位导致性能问题关键表的两种手段。火焰图制作比较繁琐,但信息更直观;query_log表分析操作简单,无需借助工具,但信息需自己分析。二者可结合使用。

制作火焰图

  1. 导出trace_log。

    1. 使用clickhouse-client,登录云数据库ClickHouse实例。如何登录,请参见通过命令行工具连接ClickHouse

    2. 执行以下命令,导出trace_log,生成cpu_trace_log.txt

      -- trace_type = 'CPU'表示跟踪CPU
      -- trace_type = 'Real'表示跟踪实际耗时
      /clickhouse/bin/clickhouse-client -h <IP> --port <port> -q "SELECT arrayStringConcat(arrayReverse(arrayMap(x -> concat( addressToLine(x), '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack, count() AS samples FROM system.trace_log WHERE trace_type = 'CPU' and event_time >= '2025-01-08 19:31:00' and event_time < '2025-01-08 19:33:00' group by trace order by samples desc FORMAT TabSeparated settings allow_introspection_functions=1" >  cpu_trace_log.txt

      参数说明如下。

      参数

      描述

      参数

      描述

      IP

      云数据库ClickHouseVPC地址。

      port

      云数据库ClickHouseTCP端口。

      除了配置上述参数外,您还需配置event_time,获取目标时间段的trace日志。

  2. 使用clickhouse-flamegraph制作火焰图。

    1. 安装clickhouse-flamegraph。下载与安装,请参见clickhouse-flamegraph

    2. 执行以下命令,生成火焰图后,分析火焰图。

      cat cpu_trace_log.txt | flamegraph.pl > cpu_trace_log.svg

      以下火焰图中,可以观察到目标实例CPU消耗较大的函数,例如ReplacingSortedMerg函数。因此,您可以重点关注ReplacingMergeTree表中的查询SQL。

      image

分析query_log

SELECT请求占用CPU和内存多的表,并不能直接说明整体性能较差,这仅仅表明其导致CPU和内存占用较多的概率更高。因此,有必要在自建的ClickHouse云数据库ClickHouse中同时执行相关命令以进行TOP对比。在确定了差异较大的表之后,需进行相关表中SELECT语句的定位。定位CPU或内存问题表的示例如下。

定位CPU问题的表
定位内存问题表
SELECT
    tables,
    first_value(query),
    count() AS cnt,
    groupArrayDistinct(normalizedQueryHash(query)) AS normalized_query_hash,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
    sum(memory_usage) as sum_memory_usage,
    avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select')
GROUP BY tables
ORDER BY sum_user_cpu DESC
LIMIT 5
SELECT
    tables,
    first_value(query),
    count() AS cnt,
    groupArrayDistinct(normalizedQueryHash(query)) AS normalized_query_hash,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
    sum(memory_usage) as sum_memory_usage,
    avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select')
GROUP BY tables
ORDER BY sum_memory_usage
LIMIT 5

步骤二:定位性能瓶颈关键SQL

根据上个步骤确定导致性能瓶颈的表后,进一步定位导致性能瓶颈的某类SQL。示例如下所示。

定位导致CPU问题的SQL
定位导致内存问题的SQL
SELECT
    tables,
    first_value(query),
    count() AS cnt,
    normalizedQueryHash(query) AS normalized_query_hash,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
    sum(memory_usage) as sum_memory_usage,
    avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select') AND has(tables, '<AIM_TABLE>')
GROUP BY
    tables,
    normalized_query_hash
ORDER BY sum_user_cpu DESC
LIMIT 5
SELECT
    tables,
    first_value(query),
    count() AS cnt,
    normalizedQueryHash(query) AS normalized_query_hash,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
    avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
    sum(memory_usage) as sum_memory_usage,
    avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select') AND has(tables, 'AIM_TABLE')
GROUP BY
    tables,
    normalized_query_hash
ORDER BY sum_memory_usage
LIMIT 5

参数说明如下:

AIM_TABLE:需要定位性能问题的表。

除了配置上述参数外,您还需配置event_time,获取目标时间段的数据。

步骤三:SQL性能分析

您可以使用EXPLAINsystem.query_log表进行分析目标语句。

  • 利用EXPLAIN分析目标SQL。EXPLAIN中各字段含义,请参见EXPLAIN

    EXPLAIN PIPELINE <存在性能问题的SQL>
  • 利用system.query_log表分析目标SQL。

    SELECT
      hostname () AS  host,
      *
    FROM
      clusterAllReplicas (`default`, system.query_log)
    WHERE 
      event_time > '2025-01-18 00:00:00'
      AND event_time < '2025-01-18 03:00:00'
      AND initial_query_id = '<INITIAL_QUERY_ID>'
      AND type = 'QueryFinish'
    ORDER BY
      query_start_time_microseconds

    参数说明如下。

    INITIAL_QUERY_ID:目标查询语句的查询ID。

    除了配置上述参数外,您还需配置event_time,获取目标时间段的数据。

    您需重点关注以下返回字段:

    • ProfileEvents:此字段提供了详细的查询执行事件计数器和统计信息,帮助您分析查询的性能和资源使用情况进行指标对比。更多查询事件信息,请参见events

    • Settings:此字段提供了查询执行时所使用的各种配置参数,这些参数可以影响查询的行为和性能。通过查看此字段,您可以更好地理解和优化目标SQL。更多查询参数信息,请参见settings

    • query_duration_ms:此字段表示查询的总执行时间。如果发现某个子查询特别慢,可以使用子查询query_id在日志文件中查找执行细节。如何修改日志级别,请参见配置config.xml参数

如果上述方法均无法定位到目标语句导致性能变差的问题,您可以在自建的ClickHouse上查询目标语句相关信息,并与目标语句在云数据库ClickHouse的相关信息进行对比,进而分析上云后目标语句性能下降的原因。

步骤四:SQL优化

定位到导致性能问题的SQL后,您可以对目标SQL进行相应的优化。以下提供几种优化方向,具体问题仍需根据实际情况进行分析和处理。

  • 索引优化:分析目标语句,定位经常用于过滤的列,为其创建索引。

  • 数据类型:合适的数据类型可以减少存储空间并提高查询性能。

  • 查询优化:

    • 查询列优化:避免使用SELECT * ,只选择需要的列,减少I/O和网络传输。

    • WHERE 子句优化:子句中使用主键和索引字段。

    • 使用 PREWHERE:对于复杂的过滤条件,可以使用 PREWHERE 子句来提前过滤数据,减少后续处理的数据量。

    • JOIN优化:对于分布式表,使用 GLOBAL JOIN可以提高 JOIN 性能。

  • 参数设置优化:通过调整查询设置来优化性能。例如,增加 max_threads 可以利用更多 CPU 核心,但需注意不要过度配置导致资源争用。更多查询参数信息,请参见settings。如何查看查询中的参数设置,请参见SQL性能分析

  • 物化视图:对于频繁执行的复杂查询,可以创建物化视图来预先计算结果,从而提高查询性能。

  • 数据压缩:ClickHouse默认启用了数据压缩,可以通过调整压缩算法和级别来进一步优化存储和查询性能。

  • 配置更大缓存:对于频繁执行且结果不经常变化的查询,云数据库ClickHouse使用查询缓存来提高性能。当您发现目前缓存不能满足需求时,可修改uncompressed_cache_size参数,设置更大缓存空间。如何修改参数,请参见配置config.xml参数

  • 本页导读 (1)
  • 背景知识
  • 兼容性分析与解决
  • 参数兼容性
  • MaterializedMySQL兼容性
  • SQL兼容性验证
  • 性能分析和优化
  • 步骤一:定位整体性能瓶颈的关键表
  • 制作火焰图
  • 分析query_log
  • 步骤二:定位性能瓶颈关键SQL
  • 步骤三:SQL性能分析
  • 步骤四:SQL优化
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等