本文为您介绍如何将MySQL整库同步Kafka,从而降低多个任务对MySQL数据库造成的压力。
背景信息
MySQL CDC数据表主要用于获取MySQL数据,并可以实时同步数据表中的修改,经常用在复杂的计算场景。例如,作为一张维表和其他数据表做Join操作。在使用中,同一张MySQL表可能被多个作业依赖,当多个任务使用同一张MySQL表做处理时,MySQL数据库会启动多个连接,对MySQL服务器和网络造成很大的压力。
为了缓解对上游MySQL数据库的压力,阿里云Flink实时计算已提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决。
具体操作是使用CDAS或CTAS语法,在一个作业里将上游的MySQL的数据实时同步到Kafka中。在MySQL整库同步任务启动后,由Kafka JSON Catalog创建Topic,每张MySQL表以Upsert Kafka 的方式写入对应topic。然后直接使用Kafka JSON Catalog中的表代替MySQL表,从而降低多个任务对MySQL数据库造成的压力。

使用限制
- 同步的MySQL表必须包含主键。
- 支持使用自建Kafka集群,EMR的Kafka集群和阿里云消息队列Kafka版。使用阿里云消息队列Kafka版时,只能通过默认接入点使用。
- upsert-kafka表暂未支持作为CTAS和CDAS语法的源表,upsert-kafka表只能作为CTAS和CDAS同步的结果表。
- Kafka集群的存储空间必须大于源表数据的存储空间,否则会因存储空间不足导致数据丢失。因为整库同步Kafka建立的topic都是compacted topic,即topic的每个消息键(Key)仅保留最近的一条消息,但是数据不会过期,compacted topic里相当于保存了与源库的表相同大小的数据。
操作步骤
应用示例
例如,在订单评论实时分析场景下,假设有用户表(user),订单表(order)和用户评论表(feedback)三张表。各个表包含数据如下图所示。

在展示用户订单信息和用户评论时,需要通过关联用户表(user)来获取用户名(name字段)信息。代码示例如下。
-- 将订单信息和用户表做join,展示每个订单的用户名和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- 将评论和用户表做join,展示每个评论的内容和对应用户名。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;
对于以上两个SQL任务,user表在两个作业中都被使用了一次。运行时,两个作业都会读取MySQL的全量数据和增量数据。全量读取需要创建MySQL连接,增量读取需要创建Binlog Client。随着作业的不断增多,MySQL连接和Binlog Client资源也会对应增长,会给上游数据库产生极大的压力。
为了缓解对上游MySQL数据库的压力,可以通过CDAS或CTAS语法在一个作业里将上游的MySQL数据实时同步到Kafka中,然后提供给多个下游作业消费。代码示例如下。
CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
同步任务成功启动后,上游MySQL数据库中的数据会以JSON格式写入Kafka中,一个Kafka Topic可以提供给多个下游作业消费,从而避免多个MySQL CDC
Source直连数据库产生压力。代码示例如下。
-- 将订单信息和Kafka JSON Catalog中的用户表做join,展示每个订单的用户名和商品名。
SELECT order.id as order_id, product, user.value_name as user_name
FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON order.user_id = user.id;
-- 将评论和Kafka JSON Catalog中的用户表做join,展示每个评论的内容和对应用户名。
SELECT feedback.id as feedback_id, comment, user.value_name as user_name
FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON feedback.user_id = user.id;