本文为您介绍MySQL连接器在常见使用场景下的最佳实践。
设置Server ID,避免Binlog消费冲突
每个同步数据库数据的客户端,都会有一个唯一ID,即Server ID。如果不同作业使用了相同的Server ID,会因为冲突导致作业报错。建议为每个MySQL CDC数据源配置不同的Server ID。
Server ID配置方式
Server ID可以在Flink建表语句中指定,也可以通过动态Hints配置。
建议通过动态Hints来配置Server ID,而不是在建表的WITH参数中配置Server ID。动态Hints详情请参见动态Hints。
不同场景下Server ID的配置
未开启增量快照框架或并行度为1
当未开启增量快照框架或并行度为1时,可以指定一个特定的Server ID。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
开启增量快照框架且并行度大于1
当开启增量快照框架且并行度大于1时,需要指定Server ID范围,确保范围内可用的Server ID数量不小于并行度。假设并行度为3,可以如下配置:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
结合CTAS进行数据同步
当结合CTAS进行数据同步时,如果CDC数据源配置相同,会自动对数据源进行合并复用,此时可以为多个CDC数据源配置相同的Server ID。详情请参见代码示例四:多CTAS语句。
同一作业包含多个MySQL CDC源表(非CTAS)
当作业中包含多个MySQL CDC源表,且不是使用CTAS语句同步时,如果没有开启Source复用(详情请参见开启Source复用,减少Binlog数据连接),需要为每一个CDC源表提供不同的Server ID。同理,如果开启增量快照框架且并行度大于1,需要指定Server ID范围。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
设置分片参数,优化内存空间
MySQL CDC源表在启动时扫描全表,将表按照主键分成多个分片(chunk),记录下此时的Binlog位点。并使用增量快照算法通过Select语句,逐个读取每个分片的数据。作业会周期性执行Checkpoint,记录下已经完成的分片。当发生Failover时,只需要继续读取未完成的分片。当分片全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。
更详细的增量快照算法,请参见MySQL CDC Connector。
对于只有一个字段的主键表,默认使用该字段进行分片。对于有联合主键的MySQL物理表,默认使用主键里的第一个字段进行分片。Flink计算引擎VVR 6.0.7及以上版本支持读取无主键源表,需要设置scan.incremental.snapshot.chunk.key-column指定一个非空类型的字段进行分片。
分片参数优化
分片数据和分片信息会保存在内存中,在一些情况下,可能会出现OOM的问题。可以根据出现OOM的组件进行参数调整:
JobManager
JobManager保存所有分片的信息,如果分片数量过多会出现OOM,需要通过增加scan.incremental.snapshot.chunk.size值来减少分片数。也可以在运行参数配置中设置jobmanager.memory.heap.size以增大JobManager堆内存,参见Flink参数配置。
TaskManager
TaskManager读取每个分片的数据,如果分片里数据条数过多会出现OOM,需要通过减少scan.incremental.snapshot.chunk.size值来减少分片里的数据条数。也可以在运行参数配置中调整Task Manager Memory为更大值以增加TaskManager堆内存。
在VVR 8.0.8及之前版本,最后一个分片需要读取的数据量可能比较大,导致TaskManager出现OOM,建议升级到VVR 8.0.9及以上以避免该问题。
对于有联合主键的MySQL CDC源表,会使用主键里的第一个字段进行分片,如果存在大量的数据在该字段中为相同字段值的情况,对应分片的数据会更多,可能会导致TaskManager出现OOM,可以设置scan.incremental.snapshot.chunk.key-column指定主键中的其他字段进行分片划分。
开启Source复用,减少Binlog数据连接
在作业中包含了多张MySQL源表时,开启Source复用能够复用Binlog连接,从而减少数据库的压力。该功能仅在实时计算Flink版本提供,社区版MySQL CDC连接器不支持。
您可以在SQL作业中使用SET命令开启Source复用功能:
SET 'table.optimizer.source-merge.enabled' = 'true';
建议在新创建的作业中就开启Source复用功能。对已有作业启用Source复用后,需要无状态启动。原因是Source复用会导致作业拓扑改变,从原有作业状态可能无法启动或者丢失数据。
开启Source复用后,具有相同配置参数的MySQL源表会进行合并。如果您的作业中所有源表的配置都相同,作业的Binlog连接数可以按照如下方式计算:
全量读取阶段,Binlog连接数等于Source并发度。
增量读取阶段,Binlog连接数等于1。
VVR 8.0.8及8.0.9版本,在开启CDC Source复用时,还需要额外设置
SET 'sql-gateway.exec-plan.enabled' = 'false';
。在开启CDC Source复用后,不建议将作业配置项
pipeline.operator-chaining
设为false,因为将算子链断开后,Source发送给下游算子的数据会增加序列化和反序列的开销,当合并的Source越多时,开销会越大。在实时计算引擎VVR 8.0.7版本,将
pipeline.operator-chaining
设为false时会出现序列化的问题。
开启Binlog解析参数,加速增量数据读取
MySQL连接器作为源表或数据摄入数据源使用时,在增量阶段会解析Binlog文件生成各种变更消息,Binlog文件使用二进制记录着所有表的变更,可以通过以下方式加速Binlog文件解析。
开启并行解析和解析过滤配置(该功能仅在实时计算Flink版本提供,社区版MySQL CDC连接器不支持)
开启配置项
scan.only.deserialize.captured.tables.changelog.enabled
:仅对指定表的变更事件进行解析。开启配置项
scan.only.deserialize.captured.tables.changelog.enabled
:采用多线程对Binlog文件进行解析,并按顺序投放到消费队列。开启该配置时通常需要增加Task Manager CPU进行配合。
优化Debezium参数
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50
debezium.max.queue.size
:阻塞队列可以容纳的记录的最大数量。当Debezium从数据库读取事件流时,它会在将事件写入下游之前将它们放入阻塞队列。默认值为8192。debezium.max.batch.size
:该连接器每次迭代处理的事件条数最大值。默认值为2048。debezium.poll.interval.ms
:连接器应该在请求新的变更事件前等待多少毫秒。默认值为1000毫秒,即1秒。
使用示例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium配置
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 开启并行解析和解析过滤
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 仅对指定表的变更事件进行解析。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- 使用多线程对Binlog进行解析。
...
)
分析数据延迟,优化作业吞吐
在增量阶段出现数据延迟时,可以按照以下步骤进行分析:
参见概览中的currentFetchEventTimeLag和currentEmitEventTimeLag两个指标,currentFetchEventTimeLag代表从Binlog读取到数据的延迟,currentEmitEventTimeLag代表从Binlog读取到作业相关的表的数据的延迟。
场景
详情
currentFetchEventTimeLag延迟较小而currentEmitEventTimeLag延迟较大,并且currentEmitEventTimeLag几乎不更新。
currentFetchEventTimeLag延迟较小说明从数据库拉取Binlog的延迟较低,但是Binlog中属于作业需要读取的表的数据较少,因此currentEmitEventTimeLag几乎不更新,属于正常现象。
currentFetchEventTimeLag延迟和currentEmitEventTimeLag延迟都比较大。
说明Source表拉取能力较弱,可以参见本小节的后续步骤进行调优。
反压的存在会导致Source端数据发送至下游算子的速率下降,您可能会观察到sourceIdleTime周期性上升,currentFetchEventTimeLag和currentEmitEventTimeLag不断增长。可以通过增大反压源头所在节点的并发度来避免该情况。
参见CPU中的TM CPU Usage指标和JVM中的TM GC Time指标,确认是否出现CPU或者内存资源不足的情况,可以适当增加作业资源以优化读取性能,还可以开启mini-batch参数以提升吞吐量,参见高性能Flink SQL优化技巧。
在作业中存在SinkUpsertMaterializer算子并且存在大状态时,会影响读取性能,请考虑增加作业并发度或者避免使用SinkUpsertMaterializer算子,详情请参见避免使用SinkUpsertMaterializer。对已有作业配置去掉SinkUpsertMaterializer算子时,需要无状态启动。原因是作业拓扑发生改变,从原有作业状态可能无法启动或者丢失数据。
开启读取 RDS Binlog,避免Binlog过期
使用阿里云RDS MySQL实例作为Source数据源时,支持读取保存在OSS的日志备份。当指定的时间戳或者Binlog位点对应的文件保存在OSS时,会自动拉取OSS日志文件到Flink集群本地进行读取,当指定的时间戳或者Binlog位点对应的文件保存在数据库本地时,会自动切换到使用数据库连接进行读取。该功能仅在实时计算Flink版本提供,社区版MySQL CDC连接器不支持。
开启读取OSS日志备份功能需要配置RDS的连接参数,使用示例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', // 数据库实例id。
'rds.main-db-id' = '12345678', // 主库编号。
'rds.endpoint' = 'rds.aliyuncs.com'
...
)
使用数据摄入进行整库同步,表结构变更同步
对于只包含数据同步逻辑的作业,建议使用数据摄入运行,数据摄入作业基于数据集成场景进行了深度优化,使用方式参见数据摄入YAML作业快速入门以及数据摄入YAML作业开发(公测中)。
如下代码提供了将MySQL的app_db整库同步到Hologres的示例,对于上游app_db库中的表结构变更,数据摄入作业会将该变更同步到下游数据库:
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres Sink
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: Sync MySQL Database to Hologres