本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。
背景信息
CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。
Canal是一个开源项目,仓库地址请参见Canal。
应用场景
基于Binlog日志实现增量订阅和消费的典型业务场景如下:
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务Cache刷新
带业务逻辑的增量数据处理
方案介绍
基于Canal和云消息队列 RocketMQ 版的CDC方案如下:
如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,并同步到云消息队列 RocketMQ 版等存储或其他中间件系统。
具体操作步骤如下:
配置MySQL:开启MySQL的Binlog功能,创建测试需要的数据库和表。
部署Canal:部署一个canal-deployer(server),监听并接收MySQL数据库的Binlog。
测试验证:验证数据变动后消息发送的情况。
环境要求
资源要求
处于运行中状态的云消息队列 RocketMQ 版实例。实例创建,请参见创建消息队列RocketMQ版实例。
处于运行中的MySQL实例。本文以阿里云RDS MySQL为例,实例创建,请参见创建RDS MySQL实例。
用于部署运行Canal相关组件的机器。本文以ECS为例,实例创建和使用,请参见通过控制台使用ECS实例(快捷版)。
网络要求
部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。
版本要求
服务 | 版本 | 说明 |
Canal | 1.1.6 | 其他版本请参见Canal Release。 |
MySQL | 8.0 | 支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。 |
云消息队列 RocketMQ 版 | 5.x |
|
1.配置MySQL
1.1 开启Binlog功能
阿里云RDS MySQL
阿里云RDS MySQL默认已开启Binlog功能,并且账号默认具有Binlog dump权限,可以直接跳过这一步。
自建MySQL
开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld] log-bin=mysql-bin # 开启binlog binlog-format=ROW # 选择ROW模式 server_id=1 # 配置MySQL replaction需要定义,不要和canal的slaveId重复
创建用户canal并授权MySQL slave 的权限。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 创建数据库
执行下面的SQL,创建一个名为canal的数据库。
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
1.3 创建表
执行下面的SQL,在canal数据库创建一个名为students的表。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);
2.部署Canal
2.1 安装JDK
执行下面的命令,安装JDK。
sudo yum install java-1.8.0-openjdk
2.2 下载canal-deployer
执行下面的命令,下载canal-deployer安装包。
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
2.3 解压缩安装包
执行下面的命令,创建目录canal-server,并将下载的安装包解压缩到canal-server目录中。
# 创建目录canal-server
sudo mkdir -p /usr/local/canal-server
# 将下载的安装包解压缩到canal-server目录中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server
2.4 修改配置
配置Canal启动的参数,详细配置请参见参数说明。
执行下面的命令,配置canal.properties。
sudo vi /usr/local/canal-server/conf/canal.properties
# 服务端模式
canal.serverMode = rocketMQ
# AccessKey ID,阿里云身份验证标识。
canal.aliyun.accessKey = 6W0xz2uPf******
# AccessKey Secret,阿里云身份验证密钥
canal.aliyun.secretKey = sK56k1DrGx******
# 消息队列接入的方式
canal.mq.accessChannel = cloud
# 消息发送格式。云消息队列 RocketMQ 版不支持批量发送,canal.mq.flatMessage需要设置成false;消费端获取到的消息body后需反序列化body内容;Java语言可使用com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) 方法转化,其他语言需参照该方法自行实现。
canal.mq.flatMessage = false
# 云消息队列 RocketMQ 版实例中Group名
rocketmq.producer.group = canal_test
# 是否开启消息轨迹
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic =
# 云消息队列 RocketMQ 版实例的命名空间。云消息队列 RocketMQ 版5.x实例无需填写该参数。
rocketmq.namespace =
# 云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重试次数
rocketmq.retry.times.when.send.failed = 0
# 是否启用VIP Netty通道发送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag =
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
执行下面的命令,配置instance.properties。
sudo vi /usr/local/canal-server/conf/example/instance.properties
# 阿里云RDS MySQL数据库的连接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL数据库的账号
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL数据库的密码
canal.instance.dbPassword=xxx
# mysql 数据解析关注的表,Perl正则表达式,canal\\..*表示canal schema下所有表
canal.instance.filter.regex=canal\\..*
# 云消息队列 RocketMQ 版实例的topic名称
canal.mq.topic=canal_topic
2.5 启动canal-deployer
执行下面的命令启动canal-deployer。
/usr/local/canal-server/bin/startup.sh
2.6 验证启动
执行下面的命令查看canal.log日志文件,确认Canal成功启动。
sudo vi /usr/local/canal-server/logs/canal/canal.log
2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
执行下面的命令查看example.log日志文件,确认Canal Instance成功启动。
sudo vi /usr/local/canal-server/logs/example/example.log
2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalIn
3.测试验证
3.1 向MYSQL数据库中添加数据
执行下面的SQL,向步骤1.3 创建表所创建的表students添加一条数据。
INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');
3.2 查看Canal发送的消息
登录云消息队列 RocketMQ 版控制台,找到部署时配置的实例,在消息查询页面查看消息如下:
相关参考
Canal支持更多的高级功能设置,完整的参数设置请参见官网文档Canal快速入门。