Canal plug-in

更新时间:
复制 MD 格式

Canal reads MySQL binary logs (binlogs) to capture incremental changes and stream them to DataHub topics using the Kafka protocol.

How Canal works with DataHub

Note

Canal supports MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x as source databases.

DataHub is compatible with the Kafka protocol, so Canal can write MySQL incremental data directly to DataHub topics. The DataHub-customized Canal package includes two changes from the open-source version:

  • The logic that replaces periods (.) with underscores (_) in Kafka topic names is removed. This allows Canal to map Kafka TopicName to the correct DataHub topic using the ProjectName.TopicName format.

  • The environment variable -Djava.security.auth.login.config=$kafka_jaas_conf is added to the startup script to support PLAIN Simple Authentication and Security Layer (SASL) authentication.

Prerequisites

Before you begin, ensure that you have:

  • A MySQL database running version 5.1.x, 5.5.x, 5.6.x, 5.7.x, or 8.0.x. See QuickStart for setup details.

  • A DataHub topic of the TUPLE type created in the target project. For topic requirements, see Kafka compatibility.

For all Canal configuration parameters and advanced options, see Canal.

Set up Canal to stream MySQL data to DataHub

Step 1: Download the Canal package

Download the DataHub-customized Canal package: canal.deployer-1.1.5-SNAPSHOT.tar.gz

Use this package instead of the standard open-source Canal release. Canal that has not been modified for DataHub may not be able to write data to DataHub.

Step 2: Extract the package

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

Step 3: Configure Canal

Edit three configuration files before starting Canal.

3.1 Instance configuration: conf/example/instance.properties

# Modify the database information as needed.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: the username and password of the database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# Specify a dynamic topic based on the database name or table name.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# Database name.Table name: the unique primary key. Multiple tables are separated with comma (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

Set canal.mq.topic to <ProjectName>.<TopicName> — the dot-separated format maps directly to a DataHub topic. For dynamic topic routing based on database or table names, and for configuring partition hash keys, see MQ-related parameters.

3.2 Canal configuration: conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

The following parameters are required:

Parameter

Value

Description

canal.serverMode

kafka

Tells Canal to use the Kafka producer protocol

kafka.bootstrap.servers

DataHub endpoint

The endpoint of DataHub in the region where the destination topic resides. For available endpoints, see Kafka compatibility.

kafka.security.protocol

SASL_SSL

Required for DataHub authentication

kafka.sasl.mechanism

PLAIN

Required for DataHub SASL/PLAIN authentication

All other parameters are optional and can be tuned to match your throughput requirements.

3.3 Java Authentication and Authorization Service (JAAS) configuration: conf/kafka_client_producer_jaas.conf

kafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

Step 4: Start Canal and verify the data flow

Start Canal

cd /usr/local/canal/
sh bin/startup.sh

Verify Canal is running

Check the main Canal log to confirm the server started successfully:

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Run vi logs/canal/canal.log to view the main log. Run vi logs/example/example.log to check the instance log:

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Verify data capture

Run vi logs/example/meta.log to confirm Canal is capturing database changes. A record is generated for each insertion, deletion, and modification of the database in the meta.log file. You can view the meta.log file to check whether Canal has collected data.

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

Stop Canal

cd /usr/local/canal/
sh bin/stop.sh

Example

The following example shows how MySQL row changes appear as DataHub records after Canal processes them.

DataHub topic schema

The destination topic is of the TUPLE type with this schema:

+-------+------+----------+-------------+
| Index | name |   type   |  allow NULL |
+-------+------+----------+-------------+
|   0   |  key |  STRING  |     true    |
|   1   |  val |  STRING  |     true    |
+-------+------+----------+-------------+

Source MySQL table

mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid   | int(11) | YES  |     | NULL    |       |
| pid   | int(11) | YES  |     | NULL    |       |
| num   | int(11) | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)

Resulting DataHub record

After mysql> insert into orders values(1,2,3); is executed, Canal writes the change to DataHub. The key field is null and the val field contains the full change event as a JSON string:

{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}

What's next

After Canal is running and data appears in the meta.log, go to the DataHub console and check the destination topic to confirm records are arriving. From there, you can connect a consumer — such as a Flink job or a downstream application — to process the incremental data.