全部产品

Canal插件

一、背景介绍

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

18-1

目前canal支持写入kafka,并且DataHub兼容kafka协议,所以您可以使用canal将MySql的增量数据写入DataHub。同时为了保证Canal能够像写Kakfa一样写入DataHub,对开源Canal做了一些必要的改动,主要改动如下:

  • Kafka的TopicName为Datahub的ProjectName.TopicName的组合,因此去掉canal将Kafka TopicName中的”.”替换为”_”的行为,保证Kafka TopicName能够正确映射到DataHub的Topic
  • DataHub使用SASL PLAIN进行认证,所以修改了启动脚本,添加了环境变量-Djava.security.auth.login.config=$kafka_jaas_conf

二、使用说明

本文只是给出了一个canal写入DataHub(Kafka)的基础示例,更多参数配置和参数含义请参考canal官网

1. 下载canal deployer压缩包

首先下载canal压缩包canal.deployer-1.1.5-SNAPSHOT.tar.gz。如果使用未经DataHub改动的canal,可能会无法写入DataHub。

2. 将canal.deployer 复制到固定目录并解压

   
  1. mkdir -p /usr/local/canal
  2. tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3.修改配置参数

3.1 修改instance配置 conf/example/instance.properties

   
  1. # 按需修改成自己的数据库信息
  2. #################################################
  3. ...
  4. canal.instance.master.address=192.168.1.20:3306
  5. # username/password,数据库的用户名和密码
  6. ...
  7. canal.instance.dbUsername = canal
  8. canal.instance.dbPassword = canal
  9. ...
  10. # mq config
  11. canal.mq.topic=test_project.test_topic
  12. # 针对库名或者表名发送动态topic
  13. #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
  14. canal.mq.partition=0
  15. # hash partition config
  16. #canal.mq.partitionsNum=3
  17. #库名.表名: 唯一主键,多个表之间用逗号分隔
  18. #canal.mq.partitionHash=mytest.person:id,mytest.role:id
  19. #################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart。针对库名的动态TopicName和根据主键哈希的设置,可参考mq参数说明

3.2 修改canal配置文件 conf/canal.properties

   
  1. # ...
  2. canal.serverMode = kafka
  3. # ...
  4. kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
  5. kafka.acks = all
  6. kafka.compression.type = none
  7. kafka.batch.size = 16384
  8. kafka.linger.ms = 1
  9. kafka.max.request.size = 1048576
  10. kafka.buffer.memory = 33554432
  11. kafka.max.in.flight.requests.per.connection = 1
  12. kafka.retries = 0
  13. kafka.security.protocol = SASL_SSL
  14. kafka.sasl.mechanism = PLAIN

其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism,其他参数用户可根据实际情况自主进行调优,kafka.bootstrap.servers需要选择topic所在region的endpoint,endpoint可前往DataHub兼容Kafka协议查看。

3.3 修改jass配置文件 conf/kafka_client_producer_jaas.conf

   
  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="accessId"
  4. password="accessKey";
  5. };

4. 启动和关闭

启动之前需要保证DataHub有相应的Topic,创建的Topic要求可以参考DataHub兼容Kafka协议

4.1 启动

   
  1. cd /usr/local/canal/
  2. sh bin/startup.sh

4.2 查看日志

查看 logs/canal/canal.logvi logs/canal/canal.log

   
  1. 2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
  2. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
  3. 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看instance的日志:vi logs/example/example.log

   
  1. 2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
  2. 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
  3. 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
  4. 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

查看meta的日志,vi logs/example/meta.log

数据库的每次增删改操作,都会在meta.log中生成一条记录,查看该日志可以确认canal是否有采集到数据。

   
  1. tail -f example/meta.log
  2. 2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
  3. 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
  4. 2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
  5. 2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
  6. 2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
  7. 2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
  8. 2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

4.3 关闭

   
  1. cd /usr/local/canal/
  2. sh bin/stop.sh

三、 数据示例

DataHub Topic

DataHub Topic为tuple topic,Schema为

   
  1. +-------+------+----------+-------------+
  2. | Index | name | type | allow NULL |
  3. +-------+------+----------+-------------+
  4. | 0 | key | STRING | true |
  5. | 1 | val | STRING | true |
  6. +-------+------+----------+-------------+

MySQL

MySQL表结构

   
  1. mysql> desc orders;
  2. +-------+---------+------+-----+---------+-------+
  3. | Field | Type | Null | Key | Default | Extra |
  4. +-------+---------+------+-----+---------+-------+
  5. | oid | int(11) | YES | | NULL | |
  6. | pid | int(11) | YES | | NULL | |
  7. | num | int(11) | YES | | NULL | |
  8. +-------+---------+------+-----+---------+-------+
  9. 3 rows in set (0.00 sec)

数据

写入DataHub后,key为null,val为json字符串。

mysql> insert into orders values(1,2,3);

   
  1. {
  2. "data":[
  3. {
  4. "oid":"1",
  5. "pid":"2",
  6. "num":"3"
  7. }
  8. ],
  9. "database":"ggtt",
  10. "es":1591092305000,
  11. "id":2,
  12. "isDdl":false,
  13. "mysqlType":{
  14. "oid":"int(11)",
  15. "pid":"int(11)",
  16. "num":"int(11)"
  17. },
  18. "old":null,
  19. "pkNames":null,
  20. "sql":"",
  21. "sqlType":{
  22. "oid":4,
  23. "pid":4,
  24. "num":4
  25. },
  26. "table":"orders",
  27. "ts":1591092305813,
  28. "type":"INSERT"
  29. }