Canal reads MySQL binary logs (binlogs) to capture incremental changes and stream them to DataHub topics using the Kafka protocol.
How Canal works with DataHub
Canal supports MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x as source databases.
DataHub is compatible with the Kafka protocol, so Canal can write MySQL incremental data directly to DataHub topics. The DataHub-customized Canal package includes two changes from the open-source version:
The logic that replaces periods (.) with underscores (_) in Kafka topic names is removed. This allows Canal to map Kafka TopicName to the correct DataHub topic using the
ProjectName.TopicNameformat.The environment variable
-Djava.security.auth.login.config=$kafka_jaas_confis added to the startup script to support PLAIN Simple Authentication and Security Layer (SASL) authentication.
Prerequisites
Before you begin, ensure that you have:
A MySQL database running version 5.1.x, 5.5.x, 5.6.x, 5.7.x, or 8.0.x. See QuickStart for setup details.
A DataHub topic of the TUPLE type created in the target project. For topic requirements, see Kafka compatibility.
For all Canal configuration parameters and advanced options, see Canal.
Set up Canal to stream MySQL data to DataHub
Step 1: Download the Canal package
Download the DataHub-customized Canal package: canal.deployer-1.1.5-SNAPSHOT.tar.gz
Use this package instead of the standard open-source Canal release. Canal that has not been modified for DataHub may not be able to write data to DataHub.
Step 2: Extract the package
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
Step 3: Configure Canal
Edit three configuration files before starting Canal.
3.1 Instance configuration: conf/example/instance.properties
# Modify the database information as needed.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: the username and password of the database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# Specify a dynamic topic based on the database name or table name.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# Database name.Table name: the unique primary key. Multiple tables are separated with comma (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
Set canal.mq.topic to <ProjectName>.<TopicName> — the dot-separated format maps directly to a DataHub topic. For dynamic topic routing based on database or table names, and for configuring partition hash keys, see MQ-related parameters.
3.2 Canal configuration: conf/canal.properties
# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
The following parameters are required:
|
Parameter |
Value |
Description |
|
|
|
Tells Canal to use the Kafka producer protocol |
|
|
DataHub endpoint |
The endpoint of DataHub in the region where the destination topic resides. For available endpoints, see Kafka compatibility. |
|
|
|
Required for DataHub authentication |
|
|
|
Required for DataHub SASL/PLAIN authentication |
All other parameters are optional and can be tuned to match your throughput requirements.
3.3 Java Authentication and Authorization Service (JAAS) configuration: conf/kafka_client_producer_jaas.conf
kafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
Step 4: Start Canal and verify the data flow
Start Canal
cd /usr/local/canal/
sh bin/startup.sh
Verify Canal is running
Check the main Canal log to confirm the server started successfully:
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
Run vi logs/canal/canal.log to view the main log. Run vi logs/example/example.log to check the instance log:
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
Verify data capture
Run vi logs/example/meta.log to confirm Canal is capturing database changes. A record is generated for each insertion, deletion, and modification of the database in the meta.log file. You can view the meta.log file to check whether Canal has collected data.
tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]
Stop Canal
cd /usr/local/canal/
sh bin/stop.sh
Example
The following example shows how MySQL row changes appear as DataHub records after Canal processes them.
DataHub topic schema
The destination topic is of the TUPLE type with this schema:
+-------+------+----------+-------------+
| Index | name | type | allow NULL |
+-------+------+----------+-------------+
| 0 | key | STRING | true |
| 1 | val | STRING | true |
+-------+------+----------+-------------+
Source MySQL table
mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid | int(11) | YES | | NULL | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)
Resulting DataHub record
After mysql> insert into orders values(1,2,3); is executed, Canal writes the change to DataHub. The key field is null and the val field contains the full change event as a JSON string:
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}
What's next
After Canal is running and data appears in the meta.log, go to the DataHub console and check the destination topic to confirm records are arriving. From there, you can connect a consumer — such as a Flink job or a downstream application — to process the incremental data.