文档

使用Canal同步MySQL数据

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文介绍如何使用Canal将MySQL数据同步到表格存储。Canal部署简单,易于运维,适用于中小规模的MySQL数据同步。

方案概览

Canal是阿里巴巴集团提供的开源产品,通过解析MySQL数据库增量日志,提供增量数据的订阅和消费功能。Canal模拟成MySQL的Slave,向MySQL Master发送dump请求。MySQL Master收到dump请求,开始推送Binary log给Canal,Canal解析Binary log来同步数据。Canal的功能原理及详细说明,请参见Canal官网

使用Canal同步MySQL数据到Tablestore中只需4步:

  1. 部署Deployer服务:Deployer服务负责从上游拉取binlog数据、记录位点等。

  2. 部署Client-Adapter服务:Client-Adapter服务负责对接Deployer解析过的数据,并将数据传输到目标库中。

    说明

    部署完成后,Canal默认自动同步MySQL增量数据。

  3. 同步MySQL全量数据:如果您需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。

    说明

    全量数据同步完成后,Canal会自动开始增量同步。

  4. 验证增量数据同步:查询Tablestore目标表中数据的同步情况。

fig_binlogtotablestore

使用说明

  • 开发语言:Java

  • Java版本:推荐使用Java1.8及以上版本。

  • 测试环境:本文中示例已经通过CentOS 7的环境验证。

  • 源数据源:RDS MySQL

  • 目的数据源:Tablestore

注意事项

  • 下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。

  • 目标数据表的主键列个数、顺序和数据类型必须与源数据表的主键列个数、顺序和数据类型相匹配。

  • Canal工具只支持同步MySQL单表数据到表格存储,不支持同步多表数据。

前提条件

  • 在MySQL实例侧完成如下操作:

    • 如果源实例为自建MySQL。需开启binlog写入功能,配置binlog-format为ROW模式。具体操作,请参见附录1:自建MySQL实例的准备工作

    • 如果源实例为阿里云RDS for MySQL。默认打开了binlog,并且账号默认具有binlog dump权限,不需要任何权限或者binlog设置。

  • 在表格存储服务侧已完成如下操作:

  • 在访问控制RAM服务侧完成如下操作:

    • 已创建RAM用户并为RAM用户授予管理表格存储权限(AliyunOTSFullAccess)。具体操作,请参见创建RAM用户为RAM用户授权

      警告

      阿里云账号AccessKey泄露会威胁您所有资源的安全。建议您使用RAM用户AccessKey进行操作,可以有效降低AccessKey泄露的风险。

    • 已为RAM用户创建AccessKey。具体操作,请参见创建AccessKey

  • 已准备带有Linux系统的服务器。

    说明

    如果当前没有安装Linux系统的服务器,推荐您使用云服务器ECS部署Linux系统后再进行操作。本文创建的实例镜像CentOS 7.6 64位。更多信息,请参见通过控制台使用ECS实例(快捷版)

1. 部署Deployer服务

1.1 下载canal.deployer包并解压

  1. 下载canal.deployer压缩包,本文以1.1.7版本为例。

    具体下载路径,请参见Canal下载地址

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
  2. 创建canal.deployer的安装目录,本文以/home/doc/tools/canal.deployer路径为例。

    mkdir -p /home/doc/tools/canal.deployer
  3. canal.deployer压缩包,复制到/home/doc/tools/canal.deployer路径并解压。

    tar -zxvf canal.deployer-1.1.7.tar.gz -C /home/doc/tools/canal.deployer

    解压后,您可以看到bin、conf、lib、logs和plugin目录。

1.2 编辑配置文件

  1. 自定义Canal实例名称。Canal实例名称默认为example,本文自定义为test_ots。

    进入canal.deployer安装目录的conf路径,编辑canal.properties文件。将canal.destinations的值修改为自定义的Canal实例名称,其他配置均保持默认即可。

    canal.destinations = test_ots  
  2. 编辑配置文件。

    1. canal.deployer安装目录的conf路径下,创建以Canal实例名称命名的目录test_ots。

      mkdir test_ots
    2. canal.deployer安装目录的conf路径下,将example目录中的instance.properties文件,复制到test_ots目录。

      cp example/instance.properties test_ots/instance.properties
    3. canal.deployer安装目录的conf/test_ots路径,编辑instance.properties文件。

      • 详细配置项说明请参见下表。

        配置项

        是否必填

        示例值

        描述

        canal.instance.master.address

        rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****

        Canal监听的源数据库地址,格式为host:port

        canal.instance.rds.accesskey

        LTAn*********************

        当MySQL为阿里云产品RDS库时,填写RAM用户的AccessKey ID和AccessKey Secret。如果非RDS库,无需填写此项。

        canal.instance.rds.secretkey

        zbnK**************************

        canal.instance.rds.instanceId

        rm-bp15p0713****

        当MySQL为阿里云产品RDS库时,填写实例ID。如果非RDS库,无需填写此项。

        canal.instance.dbUsername

        test

        源数据库账号用户名。

        canal.instance.dbPassword

        db****

        源数据库账号密码。

        canal.instance.filter.regex

        .*\\..*

        Canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。

      • instance.properties文件的完整配置示例如下。

        #################################################
        ## mysql serverId , v1.0.26+ will autoGen
        # canal.instance.mysql.slaveId=0
        
        # enable gtid use true/false
        canal.instance.gtidon=false
        
        # position info
        canal.instance.master.address=rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****
        canal.instance.master.journal.name=
        canal.instance.master.position=
        canal.instance.master.timestamp=
        canal.instance.master.gtid=
        
        # rds oss binlog
        canal.instance.rds.accesskey=LTAn*********************
        canal.instance.rds.secretkey=zbnK**************************
        canal.instance.rds.instanceId=rm-bp15p07134rkvf7****
        
        # table meta tsdb info
        canal.instance.tsdb.enable=true
        #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
        #canal.instance.tsdb.dbUsername=canal
        #canal.instance.tsdb.dbPassword=canal
        
        #canal.instance.standby.address =
        #canal.instance.standby.journal.name =
        #canal.instance.standby.position =
        #canal.instance.standby.timestamp =
        #canal.instance.standby.gtid=
        
        # username/password
        canal.instance.dbUsername=test
        canal.instance.dbPassword=db****
        canal.instance.connectionCharset = UTF-8
        # enable druid Decrypt database password
        canal.instance.enableDruid=false
        #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
        
        # table regex
        canal.instance.filter.regex=.*\\..*
        # table black regex
        canal.instance.filter.black.regex=mysql\\.slave_.*
        # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
        #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
        # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
        #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
        
        # mq config
        canal.mq.topic=example
        # dynamic topic route by schema or table regex
        #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
        canal.mq.partition=0
        # hash partition config
        #canal.mq.enableDynamicQueuePartition=false
        #canal.mq.partitionsNum=3
        #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
        #canal.mq.partitionHash=test.table:id^name,.*\\..*
        #
        # multi stream for polardbx
        canal.instance.multi.stream.on=false
        #################################################

1.3 启动Deployer服务

canal.deployer安装目录的bin路径,执行以下命令。

sh bin/startup.sh
说明

相关命令:

  • 查看server日志:在canal.deployer安装目录,执行tail -n100 logs/canal/canal.log命令。

  • 查看instance日志:在canal.deployer安装目录,执行tail -n100 logs/test_ots/test_ots.log命令。

  • 重启Deployer服务:在canal.deployer安装目录,执行sh bin/restart.sh命令。

  • 关闭Deployer服务:在canal.deployer安装目录,执行sh bin/stop.sh命令。

2. 部署Client-Adapter服务

2.1 下载canal.adapter包并解压

  1. 下载canal.adapter压缩包。

    具体下载路径,请参见Canal-adapter下载地址

  2. 创建canal.adapter的安装目录,本文以/home/doc/tools/canal.adapter路径为例。

    mkdir -p /home/doc/tools/canal.adapter
  3. canal.adapter压缩包,复制到/home/doc/tools/canal.adapter路径并解压。

    解压后,您可以看到bin、conf、lib、logs和plugin目录。

2.2 编辑配置文件

  1. 进入canal.adapter安装目录的conf路径下,编辑application.yml文件。

    • 详细配置项说明请参见下表,其他配置项的说明请参见ClientAdapter配置项说明

      配置项

      是否必填

      示例值

      描述

      srcDataSources:defaultDS:url:

      rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****

      源数据库地址,格式为host:port

      srcDataSources:defaultDS:username:

      test

      源数据库账号用户名。

      srcDataSources:defaultDS:password:

      db****

      源数据库账号密码。

      canal.conf:canalAdapters:- instance:

      test_ots

      Canal实例名称,必须与部署Depolyer时自定义的canal实例名称相同。

      canal.conf:canalAdapters:key:

      defaultDS

      源数据库标识。

      canal.conf:canalAdapters:outerAdapters: -name:

      tablestore

      定义适配器类型。设置此项为tablestore,表示此适配器下游写入Tablestore库。

      canal.conf:canalAdapters:outerAdapters: properties:tablestore.endpoint

      https://test-2009.cn-hangzhou.ots.aliyuncs.com

      填写目标Tablestore实例的服务地址

      canal.conf:canalAdapters:outerAdapters: properties:tablestore.accessSecretId

      LTAn********************

      登录账号的AccessKey ID和AccessKey Secret,获取方式请参见创建AccessKey

      canal.conf:canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

      zbnK**************************

      canal.conf:canalAdapters:outerAdapters: properties:tablestore.instanceName

      test-2009

      Tablestore实例的名称。

      canal.conf: syncBatchSize

      1000

      一个请求批次中涉及的行数,统计DML(Data Manipulation Language)操作中的数据大小。

      如果一次拉取到的DML中涉及到的行数过大,且行数大于此参数值,则该批次数据会被拆分处理。

      canal.conf: retries

      3

      Adapterprocessor层的重试次数。默认值为3。

      canal.conf: timeout

      1000

      Adapterprocessor层通过consumer向上游拉取数据的超时时间。单位为毫秒。

      如果设置timeout为0,则拉取数据时,Adapterprocessor层会处于阻塞状态直到拉取到数据。

      canal.conf : consumerProperties:canal.tcp.batch.size

      500

      consumer向上游拉取的DML个数。

      canal.conf: terminateOnException

      true

      默认为false。如果配置为true,则数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理。

    • application.yml文件的完整配置示例如下。

      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null
      
      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1000
        retries: 3
        timeout:
        accessKey:
        secretKey:
        terminateOnException: true
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: 127.0.0.1:11111
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username:
          canal.tcp.password:
      
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****/test_ots?useUnicode=true
            username: test
            password: db****
        canalAdapters:
        - instance: test_ots # canal instance Name or mq topic name
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
            - name: tablestore
              key: ts
              properties:
                tablestore.endpoint: https://test-2009.cn-hangzhou.ots.aliyuncs.com
                tablestore.accessSecretId: LTAn********************
                tablestore.accessSecretKey: zbnK**************************
                tablestore.instanceName: test-2009
  2. 进入canal.adapter安装目录的conf/tablestore路径下,创建.yml格式文件。

    说明

    如果不存在conf/tablestore路径,请手动创建该路径。

    1. 创建.yml格式的文件,本文以mytest.yml为例。

      vim mytest.yml
    2. 在mytest.yml文件中配置相应参数。

      • 详细配置项说明请参见下表。

        配置项

        是否必填

        示例值

        描述

        dataSourceKey

        defaultDS

        该任务的源数据库标识,您可以在application.ymlcanal.conf: srcDataSources下找到该标识所对应的数据库。

        destination

        test_ots

        Canal实例名称,必须与application.yml中的canal.conf: canalAdaptersinstance值相同。

        groupId

        g1

        分组ID,与application.ymlcanal.conf: canalAdapters: groupsgroupId值相同即可。在MQ模式下才需要使用,此处无需关注。

        outerAdapterKey

        ts

        使用的Adapter标识,必须与application.ymlcanal.conf: canalAdapters: groups: outerAdapterskey值相同。

        threads

        8

        Bucket数量,默认值为1,对应于tablestorewriter中的bucket数量。

        dbMapping.database

        test_ots

        源数据库名称。

        dbMapping.table

        order_contract_canal

        源表名称。

        dbMapping.targetTable

        canal_target_order

        目标表名称。

        dbMapping.targetPk

        oId: oId

        主键配置,格式为id: target_id,即源表主键: 目标表主键

        当表存在多个主键时需要配置多个,多个主键配置顺序必须与Tablestore中的主键顺序相同。

        dbMapping.targetPk中支持配置主键列自增,格式为$$: target_id,表示在目标表中生成一列名称为target_id的主键且该主键列为自增列。当上游数据写入Tablestore时,canal adapter会自动填充该列的值。关于主键列自增的更多信息,请参见主键列自增

        dbMapping.targetColumns

        create_time: createTime$string

        配置需要同步的列名以及列映射,支持配置类型转换。

        重要

        在dbMapping.targetPk中配置的非自增的主键列也需要在此处再进行配置,自增主键列不需要再进行配置。

        如果不配置类型转换,则canal会根据源表中字段类型推断目标字段类型。更多信息,请参见附录2:MySQL源表和Tablestore目标表中字段映射

        支持配置的格式包含如下4种,请根据实际需要配置。

        说明

        在配置映射的字段类型时,字段类型大小写不敏感。

        • id: target_id$string:表示源表中id字段同步到目标表后为target_id字段,且字段类型映射为string。

        • id: target_id:表示源表中id字段同步到目标表后为target_id字段。

        • id::表示id字段同步前后字段名不变,字段类型采用默认映射。

        • id: $string:等同于id: id$string,表示id字段同步前后字段名不变,且字段类型映射为string。

        dbMapping.etlCondition

        where create_time > "2021-01-01"

        全量抽取数据时的过滤条件,其中字段名称为源表字段名称。

        dbMapping.commitBatch

        200

        一次批量RPC请求导入的行数,对应于tablestorewriter中的maxBatchRowsCount,默认为writerConfig中的默认值200。

        updateChangeColumns

        false

        行覆盖或行更新。默认值为false,表示行覆盖,即一行数据更新时,使用该行最新的整行值覆盖Tablestore中的旧行。如果设置为true,则表示行更新,即一行数据更新时,只对变化的字段进行操作。

      • mytest.yml文件的完整配置示例如下。

        dataSourceKey: defaultDS
        destination: test_ots
        groupId: g1
        outerAdapterKey: ts
        threads: 8
        updateChangeColumns: false
        dbMapping:
          database: test_ots
          table: order_contract_canal
          targetTable: canal_target_order
          targetPk:
            oId: oId
          targetColumns:
            oId:
            create_time: createTime$string
            pay_time: $string
            update_time: updateTime
          etlCondition: 
          commitBatch: 200 # 批量提交的行数。

2.4 启动Client-Adapter服务

canal.adapter安装目录的bin路径,执行以下命令。

sh bin/startup.sh
说明

相关命令:

  • 查看server日志:canal.adapter安装目录,执行tail -n100 logs/adapter/adapter.log命令。

  • 重启Deployer服务:canal.adapter安装目录,执行sh bin/restart.sh命令。

  • 关闭Deployer服务:canal.adapter安装目录,执行sh bin/stop.sh命令。

3. 同步MySQL全量数据

执行以下命令调用Client-Adapter服务的方法触发同步任务。

说明

执行命令后,Canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,Canal会自动进行增量数据同步。

  • 命令格式

    curl "hostip:port/etl/type/key/task" -X POST
  • 示例

    curl "localhost:8081/etl/tablestore/ts/mytest.yml" -X POST

详细配置项说明请参见下表。

配置项

是否必选

示例

描述

hostip

localhost

部署canal服务的机器IP地址和端口。

当在部署canal服务的机器上执行此命令时,可设置hostip为localhost。

port

8081

type

tablestore

下游数据库类型,必须设置为tablestore。

key

ts

使用的Adapter标识,必须与application.ymlcanal.conf: canalAdapters: groups: outerAdapterskey值相同。

task

mytest.yml

任务配置文件的名称,必须与2. 部署Client-Adapter服务中创建的.yml格式的文件名称相同。

全量数据同步开始后,您可以在日志中查看Adapter中TablestoreWriter的传输日志变化,如下图所示。fig_datasyncmysqlall

4. 验证增量数据同步

  1. 在RDS MySQL数据库中,新增源数据表order_contract_canal的数据。

    INSERT INTO order_contract_canal (oId, create_time, pay_time, update_time) VALUES (3, "2024-1-1",  "2024-2-2", "2024-3-3");
  2. 在Tablestore数据库中,查询目标数据表canal_target_order的数据。

    1. 登录表格存储控制台

    2. 在页面上方,选择资源组和地域。

    3. 概览页面,单击实例名称或在实例操作列单击实例管理

    4. 实例详情页签,单击数据表列表查询同步的表数据。

计费说明

  • 当导入数据到表格存储时,表格存储会根据数据存储量收取相应存储费用。

  • 通过迁移工具访问表格存储时,表格存储会根据所用计费方式收取读写数据的费用。

    说明

    关于实例类型和CU的更多信息,请分别参见实例读写吞吐量

    计费模式

    计算能力说明

    VCU模式(预留模式)

    按照资源评估结果预先购买预留VCU或开启弹性能力后按实际使用量支付计算性能消耗费用。计算能力中涵盖数据读写的计算消耗。

    CU模式(按量模式)

    根据具体的读写请求按照读写吞吐量计量计费。同时根据实例类型不同,计费时需要区分按量读写CU以及预留读写CU。

    说明

    关于实例类型和CU的更多信息,请分别参见实例读写吞吐量

相关文档

附录资料

附录1:自建MySQL实例的准备工作

  1. 开启binlog写入功能,配置binlog-format为ROW模式,my.cnf中配置如下。

    [mysqld]
    log-bin=mysql-bin #开启binlog
    binlog-format=ROW #选择ROW模式
    server_id=1 #配置MySQLreplaction需要定义,不要和canal的slaveId重复
  2. 授权canal链接MySQL账号具有作为MySQL slave的权限。

    1. 登录root账号或管理员账户。

      启动并登录MySQL,以root用户为例。

      $ mysql -u root -p
      
      # 在Enter password后输入root用户密码
      Enter password: password
    2. 创建账号。

      在MySQL命令行中执行以下命令,创建一个账号canal

      mysql> CREATE USER 'canal' IDENTIFIED BY 'password';

      canalpassword是自定义的用户名和密码,可以根据实际情况进行替换。

    3. 授权。

      在命令行中执行授权命令,对canal账户进行授权。

      mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
      
      -- 重新加载权限表,使账号授权生效。
      mysql> FLUSH PRIVILEGES;

附录2:MySQL源表和Tablestore目标表中字段映射

Canal支持MySQL源表Tablestore目标表的字段名称和字段类型映射,对应canal.deployer安装目录的conf/tablestore路径下.yml格式文件中dbMapping.targetColumns的映射配置。支持作为目标类型配置在$后面的字段类型请参见下表。

源表中字段类型

目标Tablestore表中字段类型

string

string

int

int

integer

bool

bool

boolean

binary

binary

double

double

float

decimal