本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文介绍如何使用Canal将MySQL数据同步到表格存储。Canal部署简单,易于运维,适用于中小规模的MySQL数据同步。
方案概览
Canal是阿里巴巴集团提供的开源产品,通过解析MySQL数据库增量日志,提供增量数据的订阅和消费功能。Canal模拟成MySQL的Slave,向MySQL Master发送dump请求。MySQL Master收到dump请求,开始推送Binary log给Canal,Canal解析Binary log来同步数据。Canal的功能原理及详细说明,请参见Canal官网。
使用Canal同步MySQL数据到Tablestore中只需4步:
部署Deployer服务:Deployer服务负责从上游拉取binlog数据、记录位点等。
部署Client-Adapter服务:Client-Adapter服务负责对接Deployer解析过的数据,并将数据传输到目标库中。
说明部署完成后,Canal默认自动同步MySQL增量数据。
同步MySQL全量数据:如果您需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。
说明全量数据同步完成后,Canal会自动开始增量同步。
验证增量数据同步:查询Tablestore目标表中数据的同步情况。
使用说明
开发语言:Java
Java版本:推荐使用Java1.8及以上版本。
测试环境:本文中示例已经通过CentOS 7的环境验证。
源数据源:RDS MySQL
目的数据源:Tablestore
注意事项
下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。
目标数据表的主键列个数、顺序和数据类型必须与源数据表的主键列个数、顺序和数据类型相匹配。
Canal工具只支持同步MySQL单表数据到表格存储,不支持同步多表数据。
前提条件
在MySQL实例侧完成如下操作:
如果源实例为自建MySQL。需开启binlog写入功能,配置binlog-format为ROW模式。具体操作,请参见附录1:自建MySQL实例的准备工作。
如果源实例为阿里云RDS for MySQL。默认打开了binlog,并且账号默认具有binlog dump权限,不需要任何权限或者binlog设置。
在表格存储服务侧已完成如下操作:
在访问控制RAM服务侧完成如下操作:
已创建RAM用户并为RAM用户授予管理表格存储权限(AliyunOTSFullAccess)。具体操作,请参见创建RAM用户和为RAM用户授权。
警告阿里云账号AccessKey泄露会威胁您所有资源的安全。建议您使用RAM用户AccessKey进行操作,可以有效降低AccessKey泄露的风险。
已为RAM用户创建AccessKey。具体操作,请参见创建AccessKey。
已准备带有Linux系统的服务器。
说明如果当前没有安装Linux系统的服务器,推荐您使用云服务器ECS部署Linux系统后再进行操作。本文创建的实例镜像为CentOS 7.6 64位。更多信息,请参见通过控制台使用ECS实例(快捷版)。
1. 部署Deployer服务
1.1 下载canal.deployer
包并解压
下载
canal.deployer
压缩包,本文以1.1.7版本为例。具体下载路径,请参见Canal下载地址。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
创建
canal.deployer
的安装目录,本文以/home/doc/tools/canal.deployer
路径为例。mkdir -p /home/doc/tools/canal.deployer
将
canal.deployer
压缩包,复制到/home/doc/tools/canal.deployer
路径并解压。tar -zxvf canal.deployer-1.1.7.tar.gz -C /home/doc/tools/canal.deployer
解压后,您可以看到bin、conf、lib、logs和plugin目录。
1.2 编辑配置文件
自定义Canal实例名称。Canal实例名称默认为example,本文自定义为test_ots。
进入
canal.deployer
安装目录的conf
路径,编辑canal.properties
文件。将canal.destinations
的值修改为自定义的Canal实例名称,其他配置均保持默认即可。canal.destinations = test_ots
编辑配置文件。
在
canal.deployer
安装目录的conf
路径下,创建以Canal实例名称命名的目录test_ots。mkdir test_ots
在
canal.deployer
安装目录的conf
路径下,将example
目录中的instance.properties
文件,复制到test_ots
目录。cp example/instance.properties test_ots/instance.properties
在
canal.deployer
安装目录的conf/test_ots
路径,编辑instance.properties
文件。详细配置项说明请参见下表。
配置项
是否必填
示例值
描述
canal.instance.master.address
是
rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****
Canal监听的源数据库地址,格式为
host:port
。canal.instance.rds.accesskey
否
LTAn*********************
当MySQL为阿里云产品RDS库时,填写RAM用户的AccessKey ID和AccessKey Secret。如果非RDS库,无需填写此项。
canal.instance.rds.secretkey
否
zbnK**************************
canal.instance.rds.instanceId
否
rm-bp15p0713****
当MySQL为阿里云产品RDS库时,填写实例ID。如果非RDS库,无需填写此项。
canal.instance.dbUsername
是
test
源数据库账号用户名。
canal.instance.dbPassword
是
db****
源数据库账号密码。
canal.instance.filter.regex
是
.*\\..*
Canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。
instance.properties
文件的完整配置示例如下。################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:**** canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey=LTAn********************* canal.instance.rds.secretkey=zbnK************************** canal.instance.rds.instanceId=rm-bp15p07134rkvf7**** # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=test canal.instance.dbPassword=db**** canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #canal.mq.partitionHash=test.table:id^name,.*\\..* # # multi stream for polardbx canal.instance.multi.stream.on=false #################################################
1.3 启动Deployer服务
在canal.deployer
安装目录的bin
路径,执行以下命令。
sh bin/startup.sh
相关命令:
查看server日志:在
canal.deployer
安装目录,执行tail -n100 logs/canal/canal.log
命令。查看instance日志:在
canal.deployer
安装目录,执行tail -n100 logs/test_ots/test_ots.log
命令。重启Deployer服务:在
canal.deployer
安装目录,执行sh bin/restart.sh
命令。关闭Deployer服务:在
canal.deployer
安装目录,执行sh bin/stop.sh
命令。
2. 部署Client-Adapter服务
2.1 下载canal.adapter
包并解压
下载
canal.adapter
压缩包。具体下载路径,请参见Canal-adapter下载地址。
创建
canal.adapter
的安装目录,本文以/home/doc/tools/canal.adapter
路径为例。mkdir -p /home/doc/tools/canal.adapter
将
canal.adapter
压缩包,复制到/home/doc/tools/canal.adapter
路径并解压。解压后,您可以看到bin、conf、lib、logs和plugin目录。
2.2 编辑配置文件
进入
canal.adapter
安装目录的conf
路径下,编辑application.yml
文件。详细配置项说明请参见下表,其他配置项的说明请参见ClientAdapter配置项说明。
配置项
是否必填
示例值
描述
srcDataSources:defaultDS:url:
是
rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****
源数据库地址,格式为
host:port
。srcDataSources:defaultDS:username:
是
test
源数据库账号用户名。
srcDataSources:defaultDS:password:
是
db****
源数据库账号密码。
canal.conf:canalAdapters:- instance:
是
test_ots
Canal实例名称,必须与部署Depolyer时自定义的canal实例名称相同。
canal.conf:canalAdapters:key:
是
defaultDS
源数据库标识。
canal.conf:canalAdapters:outerAdapters: -name:
是
tablestore
定义适配器类型。设置此项为tablestore,表示此适配器下游写入Tablestore库。
canal.conf:canalAdapters:outerAdapters: properties:tablestore.endpoint
是
https://test-2009.cn-hangzhou.ots.aliyuncs.com
填写目标Tablestore实例的服务地址。
canal.conf:canalAdapters:outerAdapters: properties:tablestore.accessSecretId
是
LTAn********************
登录账号的AccessKey ID和AccessKey Secret,获取方式请参见创建AccessKey。
canal.conf:canalAdapters:outerAdapters: properties:tablestore.accessSecretKey
是
zbnK**************************
canal.conf:canalAdapters:outerAdapters: properties:tablestore.instanceName
是
test-2009
Tablestore实例的名称。
canal.conf: syncBatchSize
否
1000
一个请求批次中涉及的行数,统计DML(Data Manipulation Language)操作中的数据大小。
如果一次拉取到的DML中涉及到的行数过大,且行数大于此参数值,则该批次数据会被拆分处理。
canal.conf: retries
否
3
Adapterprocessor层的重试次数。默认值为3。
canal.conf: timeout
否
1000
Adapterprocessor层通过consumer向上游拉取数据的超时时间。单位为毫秒。
如果设置timeout为0,则拉取数据时,Adapterprocessor层会处于阻塞状态直到拉取到数据。
canal.conf : consumerProperties:canal.tcp.batch.size
否
500
consumer向上游拉取的DML个数。
canal.conf: terminateOnException
否
true
默认为false。如果配置为true,则数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理。
application.yml
文件的完整配置示例如下。server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 3 timeout: accessKey: secretKey: terminateOnException: true consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****/test_ots?useUnicode=true username: test password: db**** canalAdapters: - instance: test_ots # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: tablestore key: ts properties: tablestore.endpoint: https://test-2009.cn-hangzhou.ots.aliyuncs.com tablestore.accessSecretId: LTAn******************** tablestore.accessSecretKey: zbnK************************** tablestore.instanceName: test-2009
进入
canal.adapter
安装目录的conf/tablestore
路径下,创建.yml
格式文件。说明如果不存在conf/tablestore路径,请手动创建该路径。
创建
.yml
格式的文件,本文以mytest.yml
为例。vim mytest.yml
在mytest.yml文件中配置相应参数。
详细配置项说明请参见下表。
配置项
是否必填
示例值
描述
dataSourceKey
是
defaultDS
该任务的源数据库标识,您可以在
application.yml
中canal.conf: srcDataSources
下找到该标识所对应的数据库。destination
是
test_ots
Canal实例名称,必须与
application.yml
中的canal.conf: canalAdapters
下instance
值相同。groupId
是
g1
分组ID,与
application.yml
中canal.conf: canalAdapters: groups
下groupId
值相同即可。在MQ模式下才需要使用,此处无需关注。outerAdapterKey
是
ts
使用的Adapter标识,必须与
application.yml
中canal.conf: canalAdapters: groups: outerAdapters
下key
值相同。threads
是
8
Bucket数量,默认值为1,对应于tablestorewriter中的bucket数量。
dbMapping.database
是
test_ots
源数据库名称。
dbMapping.table
是
order_contract_canal
源表名称。
dbMapping.targetTable
是
canal_target_order
目标表名称。
dbMapping.targetPk
是
oId: oId
主键配置,格式为
id: target_id
,即源表主键: 目标表主键
。当表存在多个主键时需要配置多个,多个主键配置顺序必须与Tablestore中的主键顺序相同。
dbMapping.targetPk中支持配置主键列自增,格式为
$$: target_id
,表示在目标表中生成一列名称为target_id的主键且该主键列为自增列。当上游数据写入Tablestore时,canal adapter会自动填充该列的值。关于主键列自增的更多信息,请参见主键列自增。dbMapping.targetColumns
是
create_time: createTime$string
配置需要同步的列名以及列映射,支持配置类型转换。
重要在dbMapping.targetPk中配置的非自增的主键列也需要在此处再进行配置,自增主键列不需要再进行配置。
如果不配置类型转换,则canal会根据源表中字段类型推断目标字段类型。更多信息,请参见附录2:MySQL源表和Tablestore目标表中字段映射。
支持配置的格式包含如下4种,请根据实际需要配置。
说明在配置映射的字段类型时,字段类型大小写不敏感。
id: target_id$string
:表示源表中id字段同步到目标表后为target_id字段,且字段类型映射为string。id: target_id
:表示源表中id字段同步到目标表后为target_id字段。id:
:表示id字段同步前后字段名不变,字段类型采用默认映射。id: $string
:等同于id: id$string
,表示id字段同步前后字段名不变,且字段类型映射为string。
dbMapping.etlCondition
否
where create_time > "2021-01-01"
全量抽取数据时的过滤条件,其中字段名称为源表字段名称。
dbMapping.commitBatch
否
200
一次批量RPC请求导入的行数,对应于tablestorewriter中的maxBatchRowsCount,默认为writerConfig中的默认值200。
updateChangeColumns
否
false
行覆盖或行更新。默认值为false,表示行覆盖,即一行数据更新时,使用该行最新的整行值覆盖Tablestore中的旧行。如果设置为true,则表示行更新,即一行数据更新时,只对变化的字段进行操作。
mytest.yml文件的完整配置示例如下。
dataSourceKey: defaultDS destination: test_ots groupId: g1 outerAdapterKey: ts threads: 8 updateChangeColumns: false dbMapping: database: test_ots table: order_contract_canal targetTable: canal_target_order targetPk: oId: oId targetColumns: oId: create_time: createTime$string pay_time: $string update_time: updateTime etlCondition: commitBatch: 200 # 批量提交的行数。
2.4 启动Client-Adapter服务
在canal.adapter
安装目录的bin
路径,执行以下命令。
sh bin/startup.sh
相关命令:
查看server日志:
canal.adapter
安装目录,执行tail -n100 logs/adapter/adapter.log
命令。重启Deployer服务:
canal.adapter
安装目录,执行sh bin/restart.sh
命令。关闭Deployer服务:
canal.adapter
安装目录,执行sh bin/stop.sh
命令。
3. 同步MySQL全量数据
执行以下命令调用Client-Adapter服务的方法触发同步任务。
执行命令后,Canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,Canal会自动进行增量数据同步。
命令格式
curl "hostip:port/etl/type/key/task" -X POST
示例
curl "localhost:8081/etl/tablestore/ts/mytest.yml" -X POST
详细配置项说明请参见下表。
配置项 | 是否必选 | 示例 | 描述 |
hostip | 是 | localhost | 部署canal服务的机器IP地址和端口。 当在部署canal服务的机器上执行此命令时,可设置hostip为localhost。 |
port | 是 | 8081 | |
type | 是 | tablestore | 下游数据库类型,必须设置为tablestore。 |
key | 是 | ts | 使用的Adapter标识,必须与 |
task | 是 | mytest.yml | 任务配置文件的名称,必须与2. 部署Client-Adapter服务中创建的 |
全量数据同步开始后,您可以在日志中查看Adapter中TablestoreWriter的传输日志变化,如下图所示。
4. 验证增量数据同步
在RDS MySQL数据库中,新增源数据表
order_contract_canal
的数据。INSERT INTO order_contract_canal (oId, create_time, pay_time, update_time) VALUES (3, "2024-1-1", "2024-2-2", "2024-3-3");
在Tablestore数据库中,查询目标数据表
canal_target_order
的数据。登录表格存储控制台。
在页面上方,选择资源组和地域。
在概览页面,单击实例名称或在实例操作列单击实例管理。
在实例详情页签,单击数据表列表查询同步的表数据。
计费说明
当导入数据到表格存储时,表格存储会根据数据存储量收取相应存储费用。
通过迁移工具访问表格存储时,表格存储会根据所用计费方式收取读写数据的费用。
计费模式
计算能力说明
按照资源评估结果预先购买预留VCU或开启弹性能力后按实际使用量支付计算性能消耗费用。计算能力中涵盖数据读写的计算消耗。
根据具体的读写请求按照读写吞吐量计量计费。同时根据实例类型不同,计费时需要区分按量读写CU以及预留读写CU。