本文介绍如何使用MaterializeMySQL引擎将MySQL数据同步到ClickHouse。

概述

为了强化实时数仓的能力,ClickHouse推出了MaterializeMySQL数据库引擎,用于将MySQL服务器中的表映射到ClickHouse中。ClickHouse服务做为MySQL副本,读取Binlog并执行DDL和DML请求,实现了基于MySQL Binlog机制的业务数据库实时同步功能。

前提条件

  • 数据源RDS MySQL集群和目标ClickHouse集群必须属于同一个VPC网络。
  • 需要将ClickHouse集群地址白名单添加到RDS MySQL。
  • MaterializeMySQL表引擎用户必须具备MySQL库的RELOAD、REPLICATION SLAVE、REPLICATION CLIENT以及SELECT PRIVILEGE权限。

创建数据库

语法

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]

引擎参数说明

参数 说明
host:port MySQL数据库的URL和端口号。
database MySQL数据库名称。
user MySQL数据库账号。
password MySQL数据库账号的密码。

默认新增字段

使用MaterializeMySQL数据库引擎,在ClickHouse集群上新建ReplacingMergeTree引擎的表,会默认在表中增加两个隐藏字段:
字段 说明
_version 事务计数器,记录数据版本信息。UInt64类型。
_sign 删除标记,标记该行是否删除。TypeInt8类型。
可选值:
  • 1:该行未删除;
  • -1:该行已删除。

支持的类型对应

MySQL ClickHouse
TINY Int8
SHORT Int16
INT24 Int32
LONG UInt32
LONG UInt64
FLOAT Float32
DOUBLE Float64
DECIMAL,NEWDECIMAL Decimal
DATE,NEWDATE Date
DATETIME,TIMESTAMP DateTime
DATETIME2,TIMESTAMP2 DateTime64
STRING String
VARCHAR,VAR_STRING String
BLOB String
BIT UInt64
SET UInt64
ENUM Enum16
JSON String
YEAR String
TIME String
GEOMETRY String

其他的MySQL数据类型将全部都转换为字符串,同时以上的所有类型均支持可为空。

使用细则

DDL查询

MySQL DDL查询被转换成相应的ClickHouse DDL查询(ALTER, CREATE, DROP, RENAME)。如果ClickHouse不能解析某些DDL查询,该查询将被忽略。

数据复制

MaterializeMySQL不支持直接插入、删除和更新查询,而是将DDL语句进行相应转换:
  • MySQL INSERT查询被转换为INSERT with _sign=1
  • MySQL DELETE查询被转换为INSERT with _sign=-1
  • MySQL UPDATE查询被转换成INSERT with _sign=1INSERT with _sign=-1

SELECT查询

  • 如果在SELECT查询中没有指定_version,则使用FINAL修饰符,返回_version的最大值对应的数据,即最新版本的数据。
  • 如果在SELECT查询中没有指定_sign,则默认使用WHERE _sign=1,即返回未删除状态(_sign=1)的数据。

索引转换

  • ClickHouse数据库表会自动将MySQL主键和索引子句转换为ORDER BY元组。
  • ClickHouse只有一个物理顺序,由ORDER BY子句决定。如果需要创建新的物理顺序,请使用物化视图。
说明
  • 带有_sign=-1的行不会从表中物理删除。
  • MaterializeMySQL引擎不支持级联UPDATE/DELETE查询。
  • 复制很容易被破坏。
  • 禁止对数据库和表进行手动操作。
  • MaterializeMySQL受optimize_on_insert设置的影响,当MySQL服务器中的一个表发生变化时,数据被合并到MaterializeMySQL数据库中相应的表中。

云数据库ClickHouse自主研发功能

说明 exclude_tables和include_tables两个配置项不能同时使用。

同步表名单配置

如果配置了同步表名单,则只有表名单内的表会被同步。通过include_tables参数指定字符串参数类型,匹配正则表达式。配置参考如下。
Creating a Database 
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') 
SETTINGS  
include_tables ='a,b,c...';
参数值字符串和支持解析规则:
  • *:替换除/包括空字符串以外的任意数量的任何字符。
  • ?:替换任意单个字符。
  • {N..M} :替换N到M范围内的任何数字,包括两个边界。

排除表名单配置

排除同步表名单内的表不进行同步。通过exclude_tables参数指定字符串参数类型,匹配正则表达式。配置参考如下。
Creating a Database 
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') 
SETTINGS  
exclude_tables ='a,b,c...';
参数值字符串和支持解析规则:
  • *:替换除/包括空字符串以外的任意数量的任何字符。
  • ?:替换任意单个字符。
  • {N..M} :替换N到M范围内的任何数字,包括两个边界。

修改同步配置

MaterializeMySQL引擎所有的配置项都可以进行修改,且动态生效。语法如下:
ALTER database db_name MODIFY SETTING  exclude_tables|include_tables = '*';

跳过同步错误

MaterializeMySQL引擎新增了skip_error_count参数,可以通过设置参数的值来确定跳过同步过程中的错误数。参数值含义如下:
  • -9223372036854775808~-1:跳过所有错误。
  • 0:不跳过任何一个错误。
  • 1~9223372036854775807:跳过对应数字的错误。
语法如下:
Creating a Database
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password')
SETTINGS skip_error_count = -1;

示例1

创建同步库
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') 
SETTINGS 
include_tables ='a,b,c...',
exclude_tables ='e,f,g...',

示例2

在MySQL中查询
mysql> CREATE DATABASE db;
mysql> CREATE TABLE db.test (a INT PRIMARY KEY, b INT);
mysql> INSERT INTO db.test VALUES (1, 11), (2, 22);
mysql> DELETE FROM db.test WHERE a=1;
mysql> ALTER TABLE db.test ADD COLUMN c VARCHAR(16);
mysql> UPDATE db.test SET c='Wow!', b=222;
mysql> SELECT * FROM test;
+---+------+------+ 
| a |    b |    c |
+---+------+------+ 
| 2 |  222 | Wow! |
+---+------+------+

在ClickHouse与MySQL服务器交换数据

创建数据库和表:
CREATE DATABASE mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user', '***');
SHOW TABLES FROM mysql;
┌─name─┐
│ test │
└──────┘
插入数据:
SELECT * FROM mysql.test;
┌─a─┬──b─┐ 
│ 1 │ 11 │ 
│ 2 │ 22 │ 
└───┴────┘
删除数据后,添加列并更新:
SELECT * FROM mysql.test;
┌─a─┬───b─┬─c────┐ 
│ 2 │ 222 │ Wow! │ 
└───┴─────┴──────┘