ClickHouse迁移上云后,可能会面临兼容性及性能方面的问题。为了确保迁移过程顺利,并避免对生产环境造成不必要的影响,在您正式开始迁移数据前,强烈建议在测试环境进行数据迁移,然后进行上云后的兼容性和性能瓶颈分析,并解决相关问题。
背景知识
用户在业务运行的早期阶段自建了ClickHouse,后期考虑到稳定性、运维成本和容灾能力等因素,希望将ClickHouse直接迁移至云端。迁移至云端后,可能面临以下问题:
版本兼容性问题。
MaterializedMySQL引擎兼容性问题。
SQL兼容性问题。
业务整体切换至云数据库ClickHouse后,出现CPU资源耗尽、内存不足等问题。
针对上述问题,自建上云过程中需重点关注兼容性与性能问题的解决。
兼容性分析与解决
参数兼容性
拉取自建ClickHouse和云数据库ClickHouse的配置参数。
SELECT name, groupArrayDistinct(value) AS value FROM clusterAllReplicas(`default`, system.settings) GROUP BY name ORDER BY name ASC
利用文本对比工具(例如VSCode)对比二者的参数,发现配置不一致的参数后,将云数据库ClickHouse的参数配置与自建ClickHouse参数一致。
影响兼容性的常见参数:compatibility、prefer_global_in_and_join、distributed_product_mode等。
影响性能的常见参数:max_threads、max_bytes_to_merge_at_max_space_in_pool、prefer_global_in_and_join等。
MaterializedMySQL兼容性
如果您的自建ClickHouse同步的是MySQL的数据,自建上云后,云数据库ClickHouse仍需持续同步MySQL的数据,您需注意MaterializedMySQL引擎的兼容性。
MaterializedMySQL引擎在每个节点创建ReplacingMergeTree表TABLE1,每个节点保留一样的数据。但社区版已经不再维护MaterializedMySQL引擎。由于MySQL数据同步至云数据库ClickHouse的主要方案是通过DTS同步,针对MaterializedMySQL引擎社区版不再维护问题,DTS将MySQL数据同步至云数据库ClickHouse时,其使用ReplacingMergeTree表代替MaterializedMySQL表,具体实现是为云数据库ClickHouse的每个节点创建分布式表TABLE1和ReplacingMergeTree表TABLE2,然后云数据库ClickHouse通过分布式表将数据分发到各自节点。此实现引发了一系列自建ClickHouse迁移至云数据库ClickHouse的兼容问题,可能对您的业务造成影响。常见问题如下:
问题一:上云之前,在自建的ClickHouse中,MaterializedMySQL会为每个分片同步数据;而通过DTS上云后,使用ReplacingMergeTree表代替MaterializedMySQL表,数据会通过分布式表分发到各自节点,这对业务中使用IN和JOIN关联查询分布式表产生了影响。更多详情,请参见分布式表使用子查询报错后,怎么办?。
问题二:上云后,使用ReplacingMergeTree表替代MaterializedMySQL后,由于ReplacingMergeTree数据合并速度不够快,导致查询结果比自建查询结果出现更多的重复数据。可通过以下方案解决:
方案一:在云数据库ClickHouse执行
set global final=1
,设置在查询时合并数据。此参数能保证查询数据不重复,但其会占用更多CPU和内存。方案二:在云数据库ClickHouse,修改目标ReplacingMergeTree表的min_age_to_force_merge_seconds与min_age_to_force_merge_on_partition_only参数,让目标表merge更频繁,防止产生太多重复数据。示例如下:
ALTER TABLE <AIM_TABLE> MODIFY SETTING min_age_to_force_merge_on_partition_only = 1, min_age_to_force_merge_seconds = 60;
min_age_to_force_merge_on_partition_only
含义:用于控制MergeTree表引擎的合并策略。当设置为1时,开启分区强制合并数据。
默认值:0(表示关闭)
min_age_to_force_merge_seconds
含义:强制合并part的时间间隔。
默认值:3600
单位:秒
SQL兼容性验证
安装Python环境。
此验证需基本Python3环境。建议您用阿里云ECS的Linux进行验证,因为其已具有Python3环境。如何购买ECS,请参见快速购买实例。
如果非阿里云ECS环境,您需自行安装Python环境,如何安装Python环境,请参见Python官网。
安装与ClickHouse进行交互的Python客户端库。
在终端或命令提示符中,执行以下命令。
pip3 install clickhouse_driver
确保用于验证的服务器与云数据库ClickHouse和自建ClickHouse网络互通。
如何解决用于验证的服务器与云数据库ClickHouse网络互通,请参见如何解决目标集群与数据源网络互通问题?
执行Python脚本,提取自建ClickHouse的SELECT请求在云数据库ClickHouse运行,验证自建ClickHouse的SQL在云数据库ClickHouse的兼容性。脚本如下:
from clickhouse_driver import connect import datetime import logging # 需要 pip3 install clickhouse_driver # 自建实例VPC地址 host_old='HOST_OLD' # 自建实例TCP端口 port_old=TCP_PORT_OLD # 自建实例用户名 user_old='USER_OLD' # 自建实例用户密码 password_old='PASSWORD_OLD' # 云ClickHouse VPC地址 host_new='HOST_NEW' # 云ClickHouse TCP端口 port_new=TCP_PORT_NEW # 云ClickHouse用户名 user_new='USER_NEW' # 云ClickHouse用户密码 password_new='PASSWORD_NEW' # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def create_connection(host, port, user, password): """创建 ClickHouse 连接""" return connect(host=host, port=port, user=user, password=password) def get_query_hashes(cursor): """获取最近2天的 query_hash 列表""" get_queryhash_sql = ''' select distinct normalized_query_hash from system.query_log where type='QueryFinish' and `is_initial_query`=1 and `user` not in ('default', 'aurora') and lower(`query`) not like 'select 1%' and lower(`query`) not like 'select timezone()%' and lower(`query`) not like '%dms-websql%' and lower(`query`) like 'select%' and `event_time` > now() - INTERVAL 2 DAY; ''' cursor.execute(get_queryhash_sql) return cursor.fetchall() def get_sql_info(cursor, queryhash): """获取最近2天(搜索范围3天)指定 query_hash 的 SQL 信息""" get_sqlinfo_sql = f''' select `event_time`, `query_duration_ms`, `read_rows`, `read_bytes`, `memory_usage`, `databases`, `tables`, `query` from system.query_log where `event_time` > now() - INTERVAL 3 DAY and `type`='QueryFinish' and `normalized_query_hash`='{queryhash}' limit 1 ''' cursor.execute(get_sqlinfo_sql) sql_info = cursor.fetchone() if sql_info: return [info.strftime('%Y-%m-%d %H:%M:%S') if isinstance(info, datetime.datetime) else info for info in sql_info[:-1]],sql_info[-1] return None def execute_sql_on_new_db(cursor, query_sql, execute_failed_sql): """在新节点上执行 SQL 并记录失败的 SQL""" try: cursor.execute(query_sql) except Exception as error: logging.error(f'query_sql execute in new db failed: {query_sql}') execute_failed_sql[query_sql] = error def main(): # 创建连接 conn_old = create_connection( host=host_old, port=port_old, user=user_old, password=password_old ) conn_new = create_connection( host=host_new, port=port_new, user=user_new, password=password_new ) # 创建游标 cursor_old = conn_old.cursor() cursor_new = conn_new.cursor() # 获取老节点的 query_hash 列表 old_query_hashes = get_query_hashes(cursor_old) #老节点获取 SQL 执行信息 old_db_execute_dir = {} for queryhash in old_query_hashes: sql_info,query = get_sql_info(cursor_old, queryhash[0]) if sql_info: old_db_execute_dir[query] = sql_info # 关闭老节点的游标和连接 cursor_old.close() conn_old.close() # 在新节点上执行 SQL,最重要的验证步骤 execute_failed_sql = {} keys_list = list(old_db_execute_dir.keys()) for query_sql in old_db_execute_dir: position = keys_list.index(query_sql) logging.info(f"new db test the {position + 1}th/{len(old_db_execute_dir)}, running sql: {query_sql}\n") execute_sql_on_new_db(cursor_new, query_sql, execute_failed_sql) # 获取新节点的 query_hash 列表 new_query_hashes = get_query_hashes(cursor_new) new_db_execute_dir = {} for queryhash in new_query_hashes: sql_info,query = get_sql_info(cursor_new, queryhash[0]) if sql_info: new_db_execute_dir[query] = sql_info # 关闭新节点的游标和连接 cursor_new.close() conn_new.close() # 打印新老版本节点 SQL 执行信息 for query_sql in new_db_execute_dir: if query_sql in old_db_execute_dir: logging.info(f'succeed sql: {query_sql}') logging.info(f'old sql info: {old_db_execute_dir[query_sql]}') logging.info(f'new sql info: {new_db_execute_dir[query_sql]}\n') # 打印执行报错的 SQL for query_sql in execute_failed_sql: logging.error(f'failed sql: {query_sql}') logging.error(f'failed error: {execute_failed_sql[query_sql]}\n') if __name__ == "__main__": main()
验证结果与解决。
验证的SQL兼容性情况较为复杂,如遇到错误,您需根据错误信息自行进行排查与解决。
性能分析和优化
由于自建上云背景是已经有业务在运行,较难实现逐步上云测试和性能优化。业务整体切换到云ClickHouse后,实例可能出现CPU打满,内存不足等问题。在出现切换到云数据库ClickHouse后,云数据库ClickHouse性能劣化时,通过以下措施快速定位到表或请求,进而确认性能瓶颈问题。当然,如果已经知道某个表或查询导致目标实例性能较差,可以直接进入步骤三:SQL性能分析。
步骤一:定位整体性能瓶颈的关键表
火焰图与query_log分析是定位导致性能问题关键表的两种手段。火焰图制作比较繁琐,但信息更直观;query_log表分析操作简单,无需借助工具,但信息需自己分析。二者可结合使用。
制作火焰图
导出trace_log。
使用clickhouse-client,登录云数据库ClickHouse实例。如何登录,请参见通过命令行工具连接ClickHouse。
执行以下命令,导出trace_log,生成
cpu_trace_log.txt
。-- trace_type = 'CPU'表示跟踪CPU -- trace_type = 'Real'表示跟踪实际耗时 /clickhouse/bin/clickhouse-client -h <IP> --port <port> -q "SELECT arrayStringConcat(arrayReverse(arrayMap(x -> concat( addressToLine(x), '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack, count() AS samples FROM system.trace_log WHERE trace_type = 'CPU' and event_time >= '2025-01-08 19:31:00' and event_time < '2025-01-08 19:33:00' group by trace order by samples desc FORMAT TabSeparated settings allow_introspection_functions=1" > cpu_trace_log.txt
参数说明如下。
参数
描述
参数
描述
IP
云数据库ClickHouse的VPC地址。
port
云数据库ClickHouse的TCP端口。
除了配置上述参数外,您还需配置event_time,获取目标时间段的trace日志。
使用clickhouse-flamegraph制作火焰图。
安装clickhouse-flamegraph。下载与安装,请参见clickhouse-flamegraph。
执行以下命令,生成火焰图后,分析火焰图。
cat cpu_trace_log.txt | flamegraph.pl > cpu_trace_log.svg
以下火焰图中,可以观察到目标实例CPU消耗较大的函数,例如ReplacingSortedMerg函数。因此,您可以重点关注ReplacingMergeTree表中的查询SQL。
分析query_log
SELECT请求占用CPU和内存多的表,并不能直接说明整体性能较差,这仅仅表明其导致CPU和内存占用较多的概率更高。因此,有必要在自建的ClickHouse与云数据库ClickHouse中同时执行相关命令以进行TOP对比。在确定了差异较大的表之后,需进行相关表中SELECT语句的定位。定位CPU或内存问题表的示例如下。
SELECT
tables,
first_value(query),
count() AS cnt,
groupArrayDistinct(normalizedQueryHash(query)) AS normalized_query_hash,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
sum(memory_usage) as sum_memory_usage,
avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select')
GROUP BY tables
ORDER BY sum_user_cpu DESC
LIMIT 5
SELECT
tables,
first_value(query),
count() AS cnt,
groupArrayDistinct(normalizedQueryHash(query)) AS normalized_query_hash,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
sum(memory_usage) as sum_memory_usage,
avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select')
GROUP BY tables
ORDER BY sum_memory_usage
LIMIT 5
步骤二:定位性能瓶颈关键SQL
根据上个步骤确定导致性能瓶颈的表后,进一步定位导致性能瓶颈的某类SQL。示例如下所示。
SELECT
tables,
first_value(query),
count() AS cnt,
normalizedQueryHash(query) AS normalized_query_hash,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
sum(memory_usage) as sum_memory_usage,
avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select') AND has(tables, '<AIM_TABLE>')
GROUP BY
tables,
normalized_query_hash
ORDER BY sum_user_cpu DESC
LIMIT 5
SELECT
tables,
first_value(query),
count() AS cnt,
normalizedQueryHash(query) AS normalized_query_hash,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS sum_user_cpu,
sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS sum_system_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) AS avg_user_cpu,
avg(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) AS avg_system_cpu,
sum(memory_usage) as sum_memory_usage,
avg(memory_usage) as avg_memory_usage
FROM clusterAllReplicas(default, system.query_log)
WHERE (event_time > '2025-01-08 19:30:00') AND (event_time < '2025-01-08 20:30:00') AND (query_kind = 'Select') AND has(tables, 'AIM_TABLE')
GROUP BY
tables,
normalized_query_hash
ORDER BY sum_memory_usage
LIMIT 5
参数说明如下:
AIM_TABLE:需要定位性能问题的表。
除了配置上述参数外,您还需配置event_time,获取目标时间段的数据。
步骤三:SQL性能分析
您可以使用EXPLAIN和system.query_log表进行分析目标语句。
利用EXPLAIN分析目标SQL。EXPLAIN中各字段含义,请参见EXPLAIN。
EXPLAIN PIPELINE <存在性能问题的SQL>
利用system.query_log表分析目标SQL。
SELECT hostname () AS host, * FROM clusterAllReplicas (`default`, system.query_log) WHERE event_time > '2025-01-18 00:00:00' AND event_time < '2025-01-18 03:00:00' AND initial_query_id = '<INITIAL_QUERY_ID>' AND type = 'QueryFinish' ORDER BY query_start_time_microseconds
参数说明如下。
INITIAL_QUERY_ID:目标查询语句的查询ID。
除了配置上述参数外,您还需配置event_time,获取目标时间段的数据。
您需重点关注以下返回字段:
ProfileEvents:此字段提供了详细的查询执行事件计数器和统计信息,帮助您分析查询的性能和资源使用情况进行指标对比。更多查询事件信息,请参见events。
Settings:此字段提供了查询执行时所使用的各种配置参数,这些参数可以影响查询的行为和性能。通过查看此字段,您可以更好地理解和优化目标SQL。更多查询参数信息,请参见settings。
query_duration_ms:此字段表示查询的总执行时间。如果发现某个子查询特别慢,可以使用子查询query_id在日志文件中查找执行细节。如何修改日志级别,请参见配置config.xml参数。
如果上述方法均无法定位到目标语句导致性能变差的问题,您可以在自建的ClickHouse上查询目标语句相关信息,并与目标语句在云数据库ClickHouse的相关信息进行对比,进而分析上云后目标语句性能下降的原因。
步骤四:SQL优化
定位到导致性能问题的SQL后,您可以对目标SQL进行相应的优化。以下提供几种优化方向,具体问题仍需根据实际情况进行分析和处理。
索引优化:分析目标语句,定位经常用于过滤的列,为其创建索引。
数据类型:合适的数据类型可以减少存储空间并提高查询性能。
查询优化:
查询列优化:避免使用SELECT * ,只选择需要的列,减少I/O和网络传输。
WHERE 子句优化:子句中使用主键和索引字段。
使用 PREWHERE:对于复杂的过滤条件,可以使用
PREWHERE
子句来提前过滤数据,减少后续处理的数据量。JOIN优化:对于分布式表,使用 GLOBAL JOIN可以提高 JOIN 性能。
参数设置优化:通过调整查询设置来优化性能。例如,增加
max_threads
可以利用更多 CPU 核心,但需注意不要过度配置导致资源争用。更多查询参数信息,请参见settings。如何查看查询中的参数设置,请参见SQL性能分析。物化视图:对于频繁执行的复杂查询,可以创建物化视图来预先计算结果,从而提高查询性能。
数据压缩:ClickHouse默认启用了数据压缩,可以通过调整压缩算法和级别来进一步优化存储和查询性能。
配置更大缓存:对于频繁执行且结果不经常变化的查询,云数据库ClickHouse使用查询缓存来提高性能。当您发现目前缓存不能满足需求时,可修改uncompressed_cache_size参数,设置更大缓存空间。如何修改参数,请参见配置config.xml参数。
- 本页导读 (1)
- 背景知识
- 兼容性分析与解决
- 参数兼容性
- MaterializedMySQL兼容性
- SQL兼容性验证
- 性能分析和优化
- 步骤一:定位整体性能瓶颈的关键表
- 制作火焰图
- 分析query_log
- 步骤二:定位性能瓶颈关键SQL
- 步骤三:SQL性能分析
- 步骤四:SQL优化