文档

使用Canal和RocketMQ实现数据库变更订阅处理

更新时间:
一键部署

本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。

背景信息

CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。

Canal是一个开源项目,仓库地址请参见Canal

方案介绍

基于Binlog日志实现增量订阅和消费的典型业务场景如下:

  • 数据库镜像、数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务Cache更新

  • 和业务逻辑相关的增量数据处理

基于Canal和云消息队列 RocketMQ 版的CDC方案如下:

CDC方案

Canal的主要组件有deployer、adapter和admin。

  • canal-deployer(server):如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,再同步到云消息队列 RocketMQ 版等存储或其他中间件系统。

  • canal-adapter (client) :不是一个必选的组件,它从deployer中获取数据并转发。

    与Server进行数据同步模式的区别是,adapter通过gRPC协议与Server通信,canal-adapter支持多种语言的原生客户端,应用场景更加广泛。

  • canal-admin:为Canal提供配置管理、节点运维等功能。支持Web-UI界面,配置更加方便灵活,您可以直接在Web界面查看和修改相关配置信息。

环境要求

资源要求

  • 处于运行中状态的云消息队列 RocketMQ 版实例。实例创建,请参见创建消息队列RocketMQ版实例

  • 处于运行中的MySQL实例。本文以阿里云RDS MySQL为例,实例创建,请参见创建RDS MySQL实例

  • 用于部署运行Canal相关组件的机器。

网络要求

  • 部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。

版本要求

本文以Canal 1.1.6连接MySQL为例,其他数据源版本请参见Canal Release

步骤一:部署Canal Admin管理节点

canal-admin依赖MySQL来维护存储配置、ACL等相关元数据信息。

本操作以Docker容器的方式运行canal-admin为例介绍如何部署canal-admin节点,若已有管理节点,可以跳过该步骤直接执行步骤二:开启MySQL数据库的Binlog功能

# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run_admin.sh
# 使用8089端口启动canal-admin。
sh run_admin.sh -e server.port=8089 \
                -e canal.adminUser=admin \
                -e canal.adminPasswd=123456

其中使用的Canal账号是用于admin和server两个组件通信时的双向认证,默认用户名和密码为:admin/123456。

部署Canal Admin后的效果如下:

部署admin

步骤二:开启MySQL数据库的Binlog功能

本操作介绍如何搭建一个默认开启Binlog的MySQL 8.0实例。

  1. 登录创建好的MySQL数据库。

    docker run -d -p 33060:3306 --name mysql8.0 \
        --restart=always \
        --privileged=true \
        -e MYSQL_ROOT_PASSWORD=root123456 \
        mysql:8.0
    说明

    CentOS 7中的安全模块SELinux会禁止访问docker-entrypoint-initdb.d,因此这里使用了特权容器。

  2. 查看Binlog是否开启,若开启则取值为ON

    show variables like 'binlog_format';
    show variables like 'log_bin';
  3. 创建一个数据库账号,用于canal-deployer获取数据。

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    show grants for 'canal'@'%';
  4. 获取登录密码。

    由于密码在元数据库以加密方式存储,因此需要使用命令查询admin账号的登录密码。

    # mysql8.0执行结果为4ACFE3202A5FF5CF******FC58AAB1D615029441。
    select upper(sha1(unhex(sha1('admin'))))
    
    # mysql其他版本。
    select password('admin')

步骤三:部署Canal Deployer(Server)

本操作目的是部署一个canal-deployer(server),将Canal伪装成从库,监听并接收MySQL数据库的Binlog。

本操作以Docker容器方式为例运行canal-deployer,更多参数说明,请参见Docker运行

# 下载脚本。
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

# 启动deployer进行同步。
sh run.sh -e canal.auto.scan=false \
          -e canal.destinations=canal_test \
          -e canal.instance.master.address=172.17.0.1:33060 \
          -e canal.instance.dbUsername=canal \
          -e canal.instance.dbPassword=canal \
          -e canal.instance.connectionCharset=UTF-8 \
          -e canal.instance.tsdb.enable=true \
          -e canal.instance.gtidon=false \
          -e canal.serverMode=rocketMQ \
          -e canal.aliyun.accessKey=xxx \
          -e canal.aliyun.secretKey=xxx \
          -e canal.mq.accessChannel=cloud \
          -e rocketmq.namesrv.addr='http://xxx.aliyuncs.com:80' \
          -e rocketmq.vip.channel.enabled=false \
          -e rocketmq.namespace=MQ_INST_xxx \
          -e canal.mq.topic=canal_topic \
          -e rocketmq.producer.group=test \
          -e rocketmq.tag=tag \
          -e rocketmq.enable.message.trace=false \
          -e rocketmq.customized.trace.topic=
表 1. 源端MySQL参数说明

参数

说明

取值示例

canal.auto.scan

是否开启Canal本身的Instance扫描。

  • true:开启

  • false:关闭

false

canal.instance.master.address

源数据库连接地址。本示例指前提条件中创建的MySQL数据库的连接地址。获取方式,请参见连接地址与端口

172.16.x.x/12

canal.instance.dbUsername

源数据库的账号名称,即步骤二中创建的数据库账号。

xxx

canal.instance.dbPassword

源数据库的账号密码,即步骤二中获取到的登录密码。

xxx

canal.instance.connectionCharset

连接源数据库使用的字符集。

UTF-8

canal.instance.tsdb.enable

v1.0.25版本新增,是否开启table meta的时间序列版本记录功能。

  • true:开启

  • false:关闭

false

canal.instance.gtidon

是否开启数据库GTID模式。开启后能够减少位点不一致的情况下找不到位点的问题;关闭时从当前位点开始同步。

  • true:开启

  • false:关闭

false

表 2. 目标端消息队列RocketMQ版参数说明

参数

说明

取值示例

canal.serverMode

服务端模式,本示例取值为rocketMQ,表示通过服务端模式将消息数据转发到云消息队列 RocketMQ 版

rocketMQ

canal.aliyun.accessKey

AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey

LTAI4GBY9J8e7YukuU******

canal.aliyun. secretKey

AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey

bu5s0DVvIS2ZhoOFpBTWdGP0******

canal.mq.accessChannel

云消息队列 RocketMQ 版接入方式。

cloud

rocketmq.namesrv.addr

云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。

http://MQ_INST_12665404654******_BYSLmTAL.{$RegionId}.mq.aliyuncs.com:80

rocketmq.vip.channel.enabled

是否启用VIP Netty通道发送消息。

  • true:开启

  • false:关闭

false

rocketmq.namespace

云消息队列 RocketMQ 版实例的ID,在云消息队列 RocketMQ 版控制台实例详情页面获取。

使用Canal连接云消息队列 RocketMQ 版4.x实例时,该参数必须填写。

MQ_INST_1266540465******_BYSLmTAL

高阶参数配置

  • 数据同步Topic和顺序性设置

    Canal传输数据库变更事件到云消息队列 RocketMQ 版默认遵循一定的顺序性原则,在早期版本中每个任务默认是发送到云消息队列 RocketMQ 版单个Topic的单个分区。Topic设置方法可以使用canal.mq.topic参数指定传输使用的Topic。

    在v1.15版本之后的版本,Canal支持多Topic、多分区配置,即在单个任务内可以支持按照级别拆分Topic、按主键拆分分区,以此提升并发度。具体配置,请参见顺序性配置

  • 批量传输设置

    批量处理功能是在一定延迟范围内提升整体的吞吐量,相关参数如下:

    参数

    说明

    默认值

    canal.mq.canalBatchSize

    从Canal获取数据的批次大小。单位:KB。

    50

    canal.mq.canalGetTimeout

    从Canal获取数据的超时时间。单位:毫秒。

    100

    canal.mq.send.thread.size

    云消息队列 RocketMQ 版消息发送并行度。

    30

    canal.mq.build.thread.size

    云消息队列 RocketMQ 版消息构建并行度。

    8

    canal.mq.flatMessage

    云消息队列 RocketMQ 版收到的消息是否为JSON格式。

    • true:是

    • false:否。云消息队列 RocketMQ 版收到的消息为Protobuf格式,需要通过CanalMessageDeserializer进行解码。

    false

  • 链路轨迹追踪设置

    使用Canal发送变更事件到云消息队列 RocketMQ 版时,可以选择开启云消息队列 RocketMQ 版的云上消息轨迹功能,以便出现异常时可以快速定位链路问题。

    开启链路轨迹追踪功能,需要设置如下参数:

    • rocketmq.enable.message.trace设置为true

    • canal.mq.accessChannel设置为cloud

  • 序列化方式设置

    Canal向云消息队列 RocketMQ 版写入的数据是Canal指定的序列化格式,具体代码示例,请参见CanalRocketMQClientExample.java

相关参考

Canal支持更多的高级功能设置,完整的参数设置请参见官网文档Canal快速入门