使用Canal和RocketMQ实现数据库变更订阅处理
本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。
背景信息
CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。
Canal是一个开源项目,仓库地址请参见Canal。
方案介绍
基于Binlog日志实现增量订阅和消费的典型业务场景如下:
数据库镜像、数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务Cache更新
和业务逻辑相关的增量数据处理
基于Canal和云消息队列 RocketMQ 版的CDC方案如下:

Canal的主要组件有deployer、adapter和admin。
canal-deployer(server):如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,再同步到云消息队列 RocketMQ 版等存储或其他中间件系统。
canal-adapter (client) :不是一个必选的组件,它从deployer中获取数据并转发。
与Server进行数据同步模式的区别是,adapter通过gRPC协议与Server通信,canal-adapter支持多种语言的原生客户端,应用场景更加广泛。
canal-admin:为Canal提供配置管理、节点运维等功能。支持Web-UI界面,配置更加方便灵活,您可以在直接在Web界面查看和修改相关配置信息。
环境要求
资源要求
处于运行中状态的云消息队列 RocketMQ 版实例。实例创建,请参见创建消息队列RocketMQ版实例。
处于运行中的MySQL实例。本文以阿里云RDS MySQL为例,实例创建,请参见创建RDS MySQL实例。
用于部署运行Canal相关组件的机器。
网络要求
部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。
版本要求
本文以Canal 1.1.6连接MySQL为例,其他数据源版本请参见Canal Release。
源端MySQL版本支持5.1x、5.5x、5.6x、5.7x、8.0x。
目标端云消息队列 RocketMQ 版实例版本支持4.x和5.x版本,推荐使用5.x版本。
5.x版本使用教程,请参见使用Canal和RocketMQ 5.x实现数据库变更订阅处理。
步骤一:部署Canal Admin管理节点
canal-admin依赖MySQL来维护存储配置、ACL等相关元数据信息。
本操作以Docker容器的方式运行canal-admin为例介绍如何部署canal-admin节点,若已有管理节点,可以跳过该步骤直接执行步骤二:开启MySQL数据库的Binlog功能。
# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run_admin.sh
# 使用8089端口启动canal-admin。
sh run_admin.sh -e server.port=8089 \
-e canal.adminUser=admin \
-e canal.adminPasswd=123456
其中使用的Canal账号是用于admin和server两个组件通信时的双向认证,默认用户名和密码为:admin/123456。
部署Canal Admin后的效果如下:

步骤二:开启MySQL数据库的Binlog功能
本操作介绍如何搭建一个默认开启Binlog的MySQL 8.0实例。
登录创建好的MySQL数据库。
docker run -d -p 33060:3306 --name mysql8.0 \ --restart=always \ --privileged=true \ -e MYSQL_ROOT_PASSWORD=root123456 \ mysql:8.0
说明CentOS 7中的安全模块selinux会禁止访问
docker-entrypoint-initdb.d
,因此这里使用了特权容器。查看Binlog是否开启,若开启则取值为ON。
show variables like 'binlog_format'; show variables like 'log_bin';
创建一个数据库账号,用于canal-deployer获取数据。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; show grants for 'canal'@'%';
获取登录密码。
由于密码在元数据库以加密方式存储,因此需要使用命令查询admin账号的登录密码。
# mysql8.0执行结果为4ACFE3202A5FF5CF******FC58AAB1D615029441。 select upper(sha1(unhex(sha1('admin')))) # mysql其他版本。 select password('admin')
步骤三:部署Canal Deployer(Server)
本操作目的是部署一个canal-deployer(server),将Canal伪装成从库,监听并接收MySQL数据库的Binlog。
本操作以Docker容器方式为例运行canal-deployer,更多参数说明,请参见Docker运行。
# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
# 启动deployer进行同步。
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=canal_test \
-e canal.instance.master.address=172.17.0.1:33060 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.serverMode=rocketMQ \
-e canal.aliyun.accessKey=xxx \
-e canal.aliyun.secretKey=xxx \
-e canal.mq.accessChannel=cloud \
-e rocketmq.namesrv.addr='http://xxx.aliyuncs.com:80' \
-e rocketmq.vip.channel.enabled=false \
-e rocketmq.namespace=MQ_INST_xxx \
-e canal.mq.topic=canal_topic \
-e rocketmq.producer.group=test \
-e rocketmq.tag=tag \
-e rocketmq.enable.message.trace=false \
-e rocketmq.customized.trace.topic=
参数 | 说明 | 取值示例 |
canal.auto.scan | 是否开启Canal本身的Instance扫描。
| false |
canal.instance.master.address | 源数据库连接地址。本示例指前提条件中创建的MySQL数据库的连接地址。获取方式,请参见连接地址与端口。 | 172.16.x.x/12 |
canal.instance.dbUsername | 源数据库的账号名称,即步骤二中创建的数据库账号。 | xxx |
canal.instance.dbPassword | 源数据库的账号密码,即步骤二中获取到的登录密码。 | xxx |
canal.instance.connectionCharset | 连接源数据库使用的字符集。 | UTF-8 |
canal.instance.tsdb.enable | v1.0.25版本新增,是否开启table meta的时间序列版本记录功能。
| false |
canal.instance.gtidon | 是否开启数据库GTID模式。开启后能够减少位点不一致的情况下找不到位点的问题;关闭时从当前位点开始同步。
| false |
参数 | 说明 | 取值示例 |
canal.serverMode | 服务端模式,本示例取值为rocketMQ,表示通过服务端模式将消息数据转发到云消息队列 RocketMQ 版。 | rocketMQ |
canal.aliyun.accessKey | AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。 | LTAI4GBY9J8e7YukuU****** |
canal.aliyun. secretKey | AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。 | bu5s0DVvIS2ZhoOFpBTWdGP0****** |
canal.mq.accessChannel | 云消息队列 RocketMQ 版接入方式。 | cloud |
rocketmq.namesrv.addr | 云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。 | http://MQ_INST_12665404654******_BYSLmTAL.{$RegionId}.mq.aliyuncs.com:80 |
rocketmq.vip.channel.enabled | 是否启用VIP Netty通道发送消息。
| false |
rocketmq.namespace | 云消息队列 RocketMQ 版实例的ID,在云消息队列 RocketMQ 版控制台实例详情页面获取。 使用Canal连接云消息队列 RocketMQ 版4.x实例时,该参数必须填写。 | MQ_INST_1266540465******_BYSLmTAL |
高阶参数配置
数据同步Topic和顺序性设置
Canal传输数据库变更事件到云消息队列 RocketMQ 版默认遵循一定的顺序性原则,在早期版本中每个任务默认是发送到云消息队列 RocketMQ 版单个Topic的单个分区。Topic设置方法可以使用canal.mq.topic参数指定传输使用的Topic。
在v1.15版本之后的版本,Canal支持多Topic、多分区配置,即在单个任务内可以支持按照级别拆分Topic、按主键拆分分区,以此提升并发度。具体配置,请参见顺序性配置。
批量传输设置
批量处理功能是在一定延迟范围内提升整体的吞吐量,相关参数如下:
参数
说明
默认值
canal.mq.canalBatchSize
从Canal获取数据的批次大小。单位:KB。
50
canal.mq.canalGetTimeout
从Canal获取数据的超时时间。单位:毫秒。
100
canal.mq.send.thread.size
云消息队列 RocketMQ 版消息发送并行度。
30
canal.mq.build.thread.size
云消息队列 RocketMQ 版消息构建并行度。
8
canal.mq.flatMessage
云消息队列 RocketMQ 版收到的消息是否为JSON格式。
true:是
false:否。云消息队列 RocketMQ 版收到的消息为Protobuf格式,需要通过CanalMessageDeserializer进行解码。
false
链路轨迹追踪设置
使用Canal发送变更事件到云消息队列 RocketMQ 版时,可以选择开启云消息队列 RocketMQ 版的云上消息轨迹功能,以便出现异常时可以快速定位链路问题。
开启链路轨迹追踪功能,需要设置如下参数:
rocketmq.enable.message.trace设置为true
canal.mq.accessChannel设置为cloud
序列化方式设置
Canal向云消息队列 RocketMQ 版写入的数据是Canal指定的序列化格式,具体代码示例,请参见CanalRocketMQClientExample.java。
相关参考
Canal支持更多的高级功能设置,完整的参数设置请参见官网文档Canal快速入门。