canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
一、背景介绍
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
目前canal支持写入kafka,并且DataHub兼容kafka协议,所以您可以使用canal将MySql的增量数据写入DataHub。同时为了保证Canal能够像写Kafka一样写入DataHub,对开源Canal做了一些必要的改动,主要改动如下:
kafka的TopicName为DataHub的ProjectName.TopicName的组合,因此去掉canal将kafka TopicName中的”.”替换为”_”的行为,保证kafka TopicName能够正确映射到DataHub的Topic
DataHub使用SASL PLAIN进行认证,所以修改了启动脚本,添加了环境变量
-Djava.security.auth.login.config=$kafka_jaas_conf
二、使用说明
本文只是给出了一个canal写入DataHub(kafka)的基础示例,更多参数配置和参数含义请参考canal官网。
1. 下载canal deployer压缩包
首先下载canal压缩包canal.deployer-1.1.5-SNAPSHOT.tar.gz。如果使用未经DataHub改动的canal,可能会无法写入DataHub。
2. 将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
3.修改配置参数
3.1 修改instance配置 conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
对应IP地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart。针对库名的动态TopicName和根据主键哈希的设置,可参考mq参数说明。
3.2 修改canal配置文件 conf/canal.properties
# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism
,其他参数用户可根据实际情况自主进行调优,kafka.bootstrap.servers
需要选择topic所在region的endpoint,endpoint可前往DataHub兼容kafka协议查看。
3.3 修改jass配置文件 conf/kafka_client_producer_jaas.conf
kafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
4. 启动和关闭
启动之前需要保证DataHub有相应的Topic,创建的Topic要求可以参考DataHub兼容kafka协议。
4.1 启动
cd /usr/local/canal/
sh bin/startup.sh
4.2 查看日志
查看 logs/canal/canal.logvi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看instance的日志:vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
查看meta的日志,vi logs/example/meta.log
数据库的每次增删改操作,都会在meta.log中生成一条记录,查看该日志可以确认canal是否有采集到数据。
tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]
4.3 关闭
cd /usr/local/canal/
sh bin/stop.sh
三、 数据示例
DataHub Topic
DataHub Topic为tuple topic,Schema为
+-------+------+----------+-------------+
| Index | name | type | allow NULL |
+-------+------+----------+-------------+
| 0 | key | STRING | true |
| 1 | val | STRING | true |
+-------+------+----------+-------------+
MySQL
MySQL表结构
mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid | int(11) | YES | | NULL | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)
数据
写入DataHub后,key为null,val为JSON字符串。
mysql> insert into orders values(1,2,3);
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}