本文以MySQL数据库为例介绍如何使用Canal接入消息队列RocketMQ版,实现MySQL数据库Binlog数据的变更处理。

背景信息

CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到消息队列RocketMQ版中,借助消息队列RocketMQ版丰富的消息处理策略实现多样化的业务逻辑。

Canal是一个开源项目,仓库地址请参见Canal

方案介绍

基于Binlog日志实现增量订阅和消费的典型业务场景如下:
  • 数据库镜像、数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务Cache更新
  • 和业务逻辑相关的增量数据处理

基于Canal和消息队列RocketMQ版的CDC方案如下:

CDC方案
Canal的主要组件有deployer、adapter和admin。
  • canal-deployer(server):如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,再同步到消息队列RocketMQ版等存储或其他中间件系统。
  • canal-adapter (client) :不是一个必选的组件,它从deployer中获取数据并转发。

    与Server进行数据同步模式的区别是,adapter通过gRPC协议与Server通信,canal-adapter支持多种语言的原生客户端,应用场景更加广泛。

  • canal-admin:为Canal提供配置管理、节点运维等功能。支持Web-UI界面,配置更加方便灵活,您可以在直接在Web界面查看和修改相关配置信息。

环境要求

资源要求

  • 处于运行中状态的消息队列RocketMQ版实例。实例创建,请参见创建消息队列RocketMQ版实例
  • 处于运行中的MySQL实例。本文以阿里云RDS MySQL为例,实例创建,请参见创建RDS MySQL实例
  • 用于部署运行Canal相关组件的机器。

网络要求

  • 部署canal-deployer(server)的节点可以连接数据库和消息队列RocketMQ版实例,一般位于VPC内的ECS和容器都可以连接。

版本要求

本文以Canal 1.1.6连接MySQL为例,其他数据源版本请参见Canal Release

  • 源端MySQL版本支持5.1x、5.5x、5.6x、5.7x、8.0x。
  • 目标端消息队列RocketMQ版实例版本支持4.x和5.x版本,推荐使用5.x版本实例。

步骤一:部署Canal Admin管理节点

canal-admin依赖MySQL来维护存储配置、ACL等相关元数据信息。

本操作以Docker容器的方式运行canal-admin为例介绍如何部署canal-admin节点,若已有管理节点,可以跳过该步骤直接执行步骤二:开启MySQL数据库的Binlog功能

# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run_admin.sh
# 使用8089端口启动canal-admin。
sh run_admin.sh -e server.port=8089 \
                -e canal.adminUser=admin \
                -e canal.adminPasswd=123456

其中使用的Canal账号是用于admin和server两个组件通信时的双向认证,默认用户名和密码为:admin/123456。

部署Canal Admin后的效果如下:

部署admin

步骤二:开启MySQL数据库的Binlog功能

本操作介绍如何搭建一个默认开启Binlog的MySQL 8.0实例。

  1. 登录创建好的MySQL数据库。
    docker run -d -p 33060:3306 --name mysql8.0 \
        --restart=always \
        --privileged=true \
        -e MYSQL_ROOT_PASSWORD=root123456 \
        mysql:8.0
    说明 CentOS 7中的安全模块selinux会禁止访问 docker-entrypoint-initdb.d,因此这里使用了特权容器。
  2. 查看Binlog是否开启,若开启则取值为ON
    show variables like 'binlog_format';
    show variables like 'log_bin';
  3. 创建一个数据库账号,用于canal-deployer获取数据。
    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    show grants for 'canal'@'%';
  4. 获取登录密码。
    由于密码在元数据库以加密方式存储,因此需要使用命令查询admin账号的登录密码。
    # mysql8.0执行结果为4ACFE3202A5FF5CF******FC58AAB1D615029441。
    select upper(sha1(unhex(sha1('admin'))))
    
    # mysql其他版本。
    select password('admin')

步骤三:部署Canal Deployer(Server)

本操作目的是部署一个canal-deployer(server),将Canal伪装成从库,监听并接收MySQL数据库的Binlog。

本操作以Docker容器方式为例运行canal-deployer,更多参数说明,请参见Docker运行

# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

# 启动deployer进行同步。
sh run.sh -e canal.auto.scan=false \
          -e canal.destinations=canal_test \
          -e canal.instance.master.address=172.17.0.1:33060 \
          -e canal.instance.dbUsername=canal \
          -e canal.instance.dbPassword=canal \
          -e canal.instance.connectionCharset=UTF-8 \
          -e canal.instance.tsdb.enable=true \
          -e canal.instance.gtidon=false \
          -e canal.serverMode=rocketMQ \
          -e canal.aliyun.accessKey=xxx \
          -e canal.aliyun.secretKey=xxx \
          -e canal.mq.accessChannel=cloud \
          -e rocketmq.namesrv.addr='http://xxx.aliyuncs.com:80' \
          -e rocketmq.vip.channel.enabled=false \
          -e rocketmq.namespace=MQ_INST_xxx \
          -e canal.mq.topic=canal_topic \
          -e rocketmq.producer.group=test \
          -e rocketmq.tag=tag \
          -e rocketmq.enable.message.trace=false \
          -e rocketmq.customized.trace.topic=
表 1. 源端MySQL参数说明
参数 说明 取值示例
canal.auto.scan
是否开启Canal本身的Instance扫描。
  • true:开启
  • false:关闭
false
canal.instance.master.address

源数据库连接地址。本示例指前提条件中创建的MySQL数据库的连接地址。获取方式,请参见连接地址和端口

172.16.x.x/12
canal.instance.dbUsername

源数据库的账号名称,即步骤二中创建的数据库账号。

xxx
canal.instance.dbPassword

源数据库的账号密码,即步骤二中获取到的登录密码。

xxx
canal.instance.connectionCharset

连接源数据库使用的字符集。

UTF-8
canal.instance.tsdb.enable
v1.0.25版本新增,是否开启table meta的时间序列版本记录功能。
  • true:开启
  • false:关闭
false
canal.instance.gtidon
是否开启数据库GTID模式。开启后能够减少位点不一致的情况下找不到位点的问题;关闭时从当前位点开始同步。
  • true:开启
  • false:关闭
false
表 2. 目标端消息队列RocketMQ版参数说明
参数 说明 取值示例
canal.serverMode

服务端模式,本示例取值为rocketMQ,表示通过服务端模式将消息数据转发到消息队列RocketMQ版

rocketMQ
canal.aliyun.accessKey
消息队列RocketMQ版实例的用户名和密码。在 消息队列RocketMQ版控制台 实例详情页面获取。
  • 使用公网访问消息队列RocketMQ版实例时,必须填写实例用户名和密码。
  • 如果是在阿里云ECS内网访问消息队列RocketMQ版实例,无需填写实例用户名和密码,服务端会根据内网VPC信息自动获取。更多信息,请参见示例代码
6W0xz2uPf******
canal.aliyun. secretKey sK56k1DrGx******
canal.mq.accessChannel

消息队列RocketMQ版接入方式。

cloud
rocketmq.namesrv.addr

消息队列RocketMQ版实例的接入点。在消息队列RocketMQ版控制台实例详情页面获取。

如果是在阿里云ECS内网访问,填写实例的VPC接入点。

rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
rocketmq.vip.channel.enabled
是否启用VIP Netty通道发送消息。
  • true:开启
  • false:关闭
false
rocketmq.namespace

消息队列RocketMQ版实例的命名空间。

消息队列RocketMQ版5.x实例无需填写该参数。

不涉及

高阶参数配置

数据同步Topic和顺序性设置

Canal传输数据库变更事件到消息队列RocketMQ版默认遵循一定的顺序性原则,在早期版本中每个任务默认是发送到消息队列RocketMQ版单个Topic的单个分区。Topic设置方法可以使用canal.mq.topic参数指定传输使用的Topic。

在v1.15版本之后的版本,Canal支持多Topic、多分区配置,即在单个任务内可以支持按照级别拆分Topic、按主键拆分分区,以此提升并发度。具体配置,请参见顺序性配置

批量传输设置

批量处理功能是在一定延迟范围内提升整体的吞吐量,相关参数如下:

参数 说明 默认值
canal.mq.canalBatchSize 从Canal获取数据的批次大小。单位:KB。 50
canal.mq.canalGetTimeout 从Canal获取数据的超时时间。单位:毫秒。 100
canal.mq.send.thread.size 消息队列RocketMQ版消息发送并行度。 30
canal.mq.build.thread.size 消息队列RocketMQ版消息构建并行度。 8
canal.mq.flatMessage 消息队列RocketMQ版收到的消息是否为JSON格式。
  • true:是
  • false:否。消息队列RocketMQ版收到的消息为Protobuf格式,需要通过CanalMessageDeserializer进行解码。
false

链路轨迹追踪设置

使用Canal发送变更事件到消息队列RocketMQ版时,可以选择开启消息队列RocketMQ版的云上消息轨迹功能,以便出现异常时可以快速定位链路问题。

开启链路轨迹追踪功能,需要设置如下参数:
  • rocketmq.enable.message.trace设置为true
  • canal.mq.accessChannel设置为cloud

序列化方式设置

Canal向消息队列RocketMQ版写入的数据是Canal指定的序列化格式,具体代码示例,请参见CanalRocketMQClientExample.java

相关参考

Canal支持更多的高级功能设置,完整的参数设置请参见官网文档Canal快速入门