本文为您介绍MySQL的CDC(Change Data Capture)源表DDL定义、WITH参数和类型映射。
什么是MySQL的CDC源表
MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证不多读一条也不少读一条数据。即使发生故障,也能保证通过Exactly Once语义处理数据。MySQL CDC Connector支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。
- 流批一体,支持读取全量和增量数据,无需维护两套流程。
- 支持并发读取全量数据,性能水平扩展。
- 全量读取无缝切换增量读取,自动缩容,节省计算资源。
- 全量阶段读取支持断点续传,更稳定。
- 无锁读取全量数据,不影响在线业务。
实现原理
Source在启动时会扫描全表,将表按照主键分成多个chunk。并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk。当发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。更详细的增量快照算法,请参见MySQL CDC Connector。
前提条件
- MySQL和VVP的网络连通。
- MySQL服务器配置如下:
- MySQL版本为5.7和8.0.X。
- 已开启了Binlog。
- Binlog格式已设置为ROW。
- binlog_row_image已设置为FULL。
- 已在MySQL配置文件中配置了交互超时或等待超时参数。
- 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
使用限制
- 仅vvr-4.0.8-flink-1.13及以上引擎版本支持无锁读取和并发读取功能。
- MySQL CDC Connector支持读取的MySQL版本为5.7和8.0.X。
说明 VVR 4.0.11及以上版本支持读取MySQL 5.6。
- MySQL CDC 源表暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见不支持定义Watermark,那如何进行窗口聚合?。
- MySQL的CDC源表需要一个有特定权限(包括SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT)的MySQL用户,才能读取全量和增量数据。
注意事项
- 每个作业需显式配置不同的Server ID。
每个同步数据库数据的客户端,都会有一个唯一ID,即Server ID。MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量不同的Server ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。
此外,多个作业共享相同的Server ID,会导致Binlog位点错乱,多读或少读数据。因此建议每个CDC作业都配置不同的Server ID。建议通过动态Hints来配置Server ID,而不是在DDL参数中配置Server ID。配置不同的Server ID代码示例如下。
动态Hints详情请参见动态Hints。SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
说明 如果开启增量快照读取数据,则server-id配置项需要配置成与作业并发匹配的Server ID范围。 - 仅VVR 4.0.8 及以上版本支持全量阶段的无锁读取、并发读取、断点续传等功能。
如果您使用的是VVR 4.0.8以下版本,需要对MySQL用户授予RELOAD权限用来获取全局读锁,保证数据读取的一致性。全局读锁会阻塞写入操作,持锁时间可能达到秒级,因此可能对线上业务造成影响。
此外,VVR 4.0.8以下版本在全量读取阶段无法执行Checkpoint,全量阶段的作业失败会导致作业重新读取全量数据,稳定性不佳。因此建议您将作业升级到VVR 4.0.8及以上版本。
DDL定义
CREATE TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
WITH参数
参数 | 说明 | 是否必填 | 数据类型 | 备注 |
---|---|---|---|---|
connector | 源表类型。 | 是 | STRING | 可以填写为mysql-cdc 或者mysql ,二者等价。
|
hostname | MySQL数据库的IP地址或者Hostname。 | 是 | STRING | 无。 |
username | MySQL数据库服务的用户名。 | 是 | STRING | 无。 |
password | MySQL数据库服务的密码。 | 是 | STRING | 无。 |
database-name | MySQL数据库名称。 | 是 | STRING | 数据库名称支持正则表达式以读取多个数据库的数据。 |
table-name | MySQL表名。 | 是 | STRING | 表名支持正则表达式以读取多个表的数据。 |
port | MySQL数据库服务的端口号。 | 否 | INTEGER | 默认值为3306。 |
server-id | 数据库客户端的一个数字 ID。 | 否 | STRING | 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。
该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。 |
scan.incremental.snapshot.enabled | 是否开启增量快照。 | 否 | BOOLEAN | 默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:
如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。 |
scan.incremental.snapshot.chunk.size | 表的chunk的大小(行数)。 | 否 | Integer | 默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。 |
scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | 否 | Integer | 默认值为1024。 |
scan.startup.mode | 消费数据时的启动模式。 | 否 | STRING | 参数取值如下:
|
server-time-zone | 数据库在使用的会话时区。 | 否 | STRING | 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。 |
debezium.min.row.count.to.stream.results | 当表的条数大于该值时,会使用分批读取模式。 | 否 | INTEGER | 默认值为1000。Flink采用以下方式读取MySQL源表数据:
|
connect.timeout | 连接MySQL数据库服务器超时时,重试连接之前等待超时的最长时间。 | 否 | Duration | 默认值为30秒。 |
connect.max-retries | 连接MySQL数据库服务时,连接失败后重试的最大次数。 | 否 | Integer | 默认值为3。 |
connection.pool.size | 数据库连接池大小。 | 否 | Integer | 默认值为20个。
数据库连接池用于复用连接,可以降低数据库连接数量。 |
jdbc.properties.* | JDBC URL中的自定义连接参数。 | 否 | String |
您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'。 支持的连接参数请参见Mysql Configuration Properties。 |
heartbeat.interval | Source通过心跳事件推动Binlog位点前进的时间间隔。 | 否 | Duration | 默认值为30s。
心跳事件用于推动Source中的Binlog位点前进,这对MySQL中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。 |
元数据
元数据在分库分表合并同步场景非常实用,因为分库分表合并后,一般业务还是希望区分每条数据的库名和表名来源,而元数据列可以访问源表的库名和表名信息。因此通过元数据列可以非常方便地将多张分表合并到一张目的表。
元数据key | 元数据类型 | 描述 |
---|---|---|
database_name | STRING NOT NULL | 包含该行记录的库名。 |
table_name | STRING NOT NULL | 包含该行记录的表名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该行记录在数据库中的变更时间,如果该记录来自表的存量历史数据而不是Binlog中获取,则该值总是0。 |
CREATE TABLE mysql_orders (
db_name STRING METADATA FROM 'database_name' VIRTUAL, -- 读取库名。
table_name STRING METADATA FROM 'table_name' VIRTUAL, -- 读取表名。
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 读取变更时间。
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'mydb_.*', -- 正则匹配多个分库。
'table-name' = 'orders_.*' -- 正则匹配多张分表。
);
INSERT INTO holo_orders SELECT * FROM mysql_orders;
并发控制
MySQL-CDC connector 支持多并发读取全量数据,能够提高数据加载效率。同时配合Flink VVP平台的Autopilot自动调优功能,在多并发读取完成后增量阶段,能够自动缩容,节约计算资源。
- 基础模式设置的并发数为整个作业的全局并发数。
- 专家模式支持按需为某个VERTEX设置并发数。
Autopilot自动缩容
全量阶段积累了大量历史数据,为了提高读取效率,通常采用并发的方式读取历史数据。而在Binlog增量阶段,因为Binlog数据量少且为了保证全局有序,通常只需要单并发读取。全量阶段和增量阶段对资源的不同需求,可以通过自动调优功能自动帮您实现性能和资源的平衡。
类型映射
MySQL CDC字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |
常见问题
技术原理及企业版特性
- Flink CDC企业版特性
- Flink CDC技术