使用Canal同步MySQL数据

更新时间:2025-02-17 09:25:55

本文介绍如何使用CanalMySQL数据库(自建MySQLRDS MySQL)中的数据同步到表格存储(Tablestore)。您可以通过Canal实现MySQL增量数据的实时同步,也可以将MySQL的全量数据同步到表格存储。

背景信息

Canal是阿里的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其工作原理是伪装成MySQLSlave,模拟MySQL Slave的交互协议,向MySQL Master发送dump请求,获取binary log并解析。更多信息,请参见Canal

准备工作

  • 准备需要同步的 MySQL 信息,包括用户名、密码、数据库地址、JDBC 连接信息等。

    重要

    通过Canal同步MySQL数据需要开启MySQLbinlog功能,确认binlog是否开启以及开启方式请参见附录1:MySQL准备工作

  • 开通表格存储服务,创建实例和数据表,用于存放同步的数据。具体操作,请参见开通服务并创建实例创建数据表

    说明

    创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。本文使用的MySQL和表格存储数据表样例,请参见附录3:样例数据

  • 获取表格存储的实例名称和服务地址(Endpoint)。

    1. 登录表格存储控制台

    2. 在页面上方,选择资源组和地域。

    3. 概览页面,单击实例别名或在操作列单击实例管理

    4. 实例详情页签,查看实例的名称和服务地址。

  • 获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey

    说明

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限( AliyunOTSFullAccess )并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储

操作步骤

本文操作使用的服务器为云服务器ECS,操作系统为Alibaba Cloud Linux 3.2104 LTS 64位和Ubuntu 24.04 64位。

一、安装JDK

Canal的运行依赖JDK(1.8及以上,推荐1.8),本文介绍在ECSAlibaba Cloud LinuxUbuntu系统中安装JDK 1.8的方法,如果您使用的是其他服务器,请自行进行安装。

Alibaba Cloud Linux
Ubuntu
yum -y install java-1.8.0-openjdk-devel.x86_64
apt update && apt upgrade
apt install openjdk-8-jdk

二、部署Deployer服务

  1. 创建Deployer服务部署目录。

    cd && mkdir -p canal/deployer
    cd canal/deployer
  2. 下载Deployer软件包并解压。

    1. 下载Deployer软件包。

      wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz

      如果网络不佳,您也可以直接下载canal.deployer-1.1.8.tar.gz并上传至服务器。

    2. 解压软件包。

      tar -zxvf canal.deployer-1.1.8.tar.gz
  3. 编辑配置文件。

    1. 创建实例空间,将默认配置文件复制到创建的实例空间文件夹。

      mkdir conf/tablestore
      cp conf/example/instance.properties conf/tablestore
    2. 修改配置文件。

      vi conf/tablestore/instance.properties

      修改以下配置项,指定监听的MySQL源数据库。

      配置项

      示例值

      说明

      配置项

      示例值

      说明

      canal.instance.master.address

      • 自建MySQL:47.**.**.44:3306

      • RDS MySQL:rm-cn-rp************mo.rwlb.rds.aliyuncs.com:3306

      Canal监听的MySQL源数据库地址,格式为 host:port

      canal.instance.dbUsername

      canal

      MySQL源数据库用户的用户名。

      重要

      该用户需要有 SELECTREPLICATION SLAVEREPLICATION CLIENT 权限。授权方式请参见授权MySQL用户具有MySQL Slave的权限

      canal.instance.dbPassword

      Ca*******88

      MySQL源数据库用户的密码。

      完整示例配置文件

      #################################################
      ## mysql serverId , v1.0.26+ will autoGen
      # canal.instance.mysql.slaveId=0
      
      # enable gtid use true/false
      canal.instance.gtidon=false
      
      # rds oss binlog
      canal.instance.rds.accesskey=
      canal.instance.rds.secretkey=
      canal.instance.rds.instanceId=
      
      # position info
      canal.instance.master.address=47.**.**.44:3306
      canal.instance.master.journal.name=
      canal.instance.master.position=
      canal.instance.master.timestamp=
      canal.instance.master.gtid=
      
      # multi stream for polardbx
      canal.instance.multi.stream.on=false
      
      # ssl
      #canal.instance.master.sslMode=DISABLED
      #canal.instance.master.tlsVersions=
      #canal.instance.master.trustCertificateKeyStoreType=
      #canal.instance.master.trustCertificateKeyStoreUrl=
      #canal.instance.master.trustCertificateKeyStorePassword=
      #canal.instance.master.clientCertificateKeyStoreType=
      #canal.instance.master.clientCertificateKeyStoreUrl=
      #canal.instance.master.clientCertificateKeyStorePassword=
      
      # table meta tsdb info
      canal.instance.tsdb.enable=true
      #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
      #canal.instance.tsdb.dbUsername=canal
      #canal.instance.tsdb.dbPassword=Ca*******88
      
      #canal.instance.standby.address =
      #canal.instance.standby.journal.name =
      #canal.instance.standby.position =
      #canal.instance.standby.timestamp =
      #canal.instance.standby.gtid=
      
      # username/password
      canal.instance.dbUsername=canal
      canal.instance.dbPassword=Ca*******88
      canal.instance.connectionCharset = UTF-8
      # enable druid Decrypt database password
      canal.instance.enableDruid=false
      #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
      
      # table regex
      canal.instance.filter.regex=.*\\..*
      # table black regex
      canal.instance.filter.black.regex=mysql\\.slave_.*
      # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
      # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
      
      # mq config
      canal.mq.topic=example
      # dynamic topic route by schema or table regex
      #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
      canal.mq.partition=0
      # hash partition config
      #canal.mq.enableDynamicQueuePartition=false
      #canal.mq.partitionsNum=3
      #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
      #canal.mq.partitionHash=test.table:id^name,.*\\..*
      #################################################
    3. 修改canal.properties文件,自定义实例名称。

      vi conf/canal.properties

      修改以下配置项,指定实例名称为tablestore。

      canal.destinations = tablestore
  4. 启动Deployer服务。

    sh bin/startup.sh

    您可以通过以下命令查看日志,以判断服务是否启动成功。

    tail -f logs/tablestore/tablestore.log

    启动成功会有如下信息展示。

    2025-02-13 19:45:40.622 [destination = tablestore , address = /47.**.**.44:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=249075669,serverId=1,gtid=,timestamp=1739446563000] cost : 22134ms , the next step is binlog dump
    2025-02-13 19:45:40.737 [destination = tablestore , address = /47.**.**.44:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlConnection - load MySQL @@version_comment : Source distribution

    如需关闭Deployer服务,您可以执行以下命令。

    sh bin/stop.sh

三、部署Adapter服务

  1. 创建Adapter服务部署目录。

    cd && mkdir -p canal/adapter
    cd canal/adapter
  2. 下载Adapter软件包并解压。

    1. 下载Adapter软件包canal.adapter-1.0.1.tar.gz,并上传至服务器的部署目录。

      重要

      GitHub上的Adapter软件包暂不支持将数据写入表格存储,请使用本文提供的Adapter软件包。

    2. 解压软件包。

      tar -zxvf canal.adapter-1.0.1.tar.gz
  3. 编辑配置文件。

    1. 修改配置文件application.yml,定义适配器信息。

      vi conf/application.yml

      修改以下配置项,指定MySQL数据库连接信息和适配器信息。

      配置项

      示例值

      说明

      配置项

      示例值

      说明

      srcDataSources

      defaultDS

      源数据库标识。保持默认即可。

      srcDataSources.defaultDS.url

      • 自建MySQL:jdbc:mysql://47.**.**.44:3306/ki**at?useSSL=false

      • RDS MySQL:jdbc:mysql://rm-cn-rp************mo.rwlb.rds.aliyuncs.com/ki**at?useSSL=false

      MySQL源数据库的JDBC连接信息。

      srcDataSources.defaultDS.username

      canal

      MySQL源数据库用户的用户名。

      srcDataSources.defaultDS.password

      Ca*******88

      MySQL源数据库用户的密码。

      canalAdapters.instance

      tablestore

      Canal实例名称。必须与Deployer的实例名称相同。

      canalAdapters.groups.groupId

      g1

      适配器组ID。保持默认即可。

      canalAdapters.groups.groupId.outerAdapters.name

      tablestore

      适配器类型。必须设置为tablestore,表示此适配器将数据写入到表格存储。

      canalAdapters.groups.groupId.outerAdapters.key

      orders

      适配器标识。如果需要同步多张表,可以通过该参数进行区分。

      canalAdapters.groups.groupId.outerAdapters.properties.tablestore.endpoint

      https://i0********44.cn-hangzhou.ots.aliyuncs.com

      表格存储实例访问地址。

      重要

      为了保证资源访问安全,表格存储新创建的实例默认已关闭公网访问。如果您使用的是公网访问地址,请在实例管理 > 网络管理中允许网络类型勾选公网。

      canalAdapters.groups.groupId.outerAdapters.properties.tablestore.accessSecretId

      LT********************bE

      阿里云账号或RAM用户的AccessKey ID。

      canalAdapters.groups.groupId.outerAdapters.properties.tablestore.accessSecretKey

      CR**************************HD

      阿里云账号或RAM用户的AccessKey Secret。

      canalAdapters.groups.groupId.outerAdapters.properties.tablestore.instanceName

      i0********44

      表格存储实例名称。

      完整示例配置文件内容

      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null
      
      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1
        retries: 3
        timeout:
        accessKey:
        secretKey:
        terminateOnException: true
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: 127.0.0.1:11111
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username:
          canal.tcp.password:
      
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://47.**.**.44:3306/ki**at?useSSL=false
            username: canal
            password: Ca*******88
        canalAdapters:
        - instance: tablestore # canal instance Name or mq topic name
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
            - name: tablestore
              key: orders
              properties:
                tablestore.endpoint: https://i0********44.cn-hangzhou.ots.aliyuncs.com
                tablestore.accessSecretId: LT********************bE
                tablestore.accessSecretKey: CR**************************HD
                tablestore.instanceName: i0********44

      多表同步示例配置文件内容

      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null
      
      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1
        retries: 3
        timeout:
        accessKey:
        secretKey:
        terminateOnException: true
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: 127.0.0.1:11111
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username:
          canal.tcp.password:
      
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://47.**.**.44:3306/ki**at?useSSL=false
            username: canal
            password: Ca*******88
        canalAdapters:
        - instance: tablestore # canal instance Name or mq topic name
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
            - name: tablestore
              key: orders
              properties:
                tablestore.endpoint: https://i0********44.cn-hangzhou.ots.aliyuncs.com
                tablestore.accessSecretId: LT********************bE
                tablestore.accessSecretKey: CR**************************HD
                tablestore.instanceName: i0********44
            - name: tablestore
              key: sku
              properties:
                tablestore.endpoint: https://n0********16.cn-hangzhou.ots.aliyuncs.com
                tablestore.accessSecretId: LT********************6r
                tablestore.accessSecretKey: 5z**************************TP
                tablestore.instanceName: n0********16
    2. 修改适配器配置文件,指定需要消费的数据以及表映射。

      说明

      如果需要多表同步,需要在Adapter安装目录的 conf/tablestore 目录下创建多个 .yml 格式的配置文件,通过配置文件内的 outerAdapterKeyapplication.yml文件中的 canalAdapters.groups.groupId.outerAdapters.key 进行匹配。

      vi conf/tablestore/orders.yml

      修改以下配置项。

      配置项

      示例值

      说明

      配置项

      示例值

      说明

      dataSourceKey

      defaultDS

      源数据库标识。与application.yml文件中的srcDataSources保持一致。

      destination

      tablestore

      实例名称。与application.yml文件中的canalAdapters.instance保持一致。

      groupId

      g1

      适配器组ID。与application.yml文件中的canalAdapters.groups.groupId保持一致。

      outerAdapterKey

      orders

      适配器标识。与application.yml文件中的canalAdapters.groups.groupId.outerAdapters.key保持一致。

      threads

      1

      TablestoreWriter中的bucket数量,默认值为1。

      updateChangeColumns

      false

      行覆盖或行更新。默认值为false,表示行覆盖,即一行数据更新时,使用该行最新的整行值覆盖Tablestore中的旧行。如果设置为true,则表示行更新,即一行数据更新时,只对变化的字段进行操作。

      dbMapping.database

      ki**at

      需要同步的MySQL数据库名称。

      dbMapping.table

      orders

      需要同步的MySQL数据表源表名称。

      dbMapping.targetTable

      orders_canal

      写入到表格存储的目标表名称。

      dbMapping.targetPk

      order_id: order_id

      主键配置,格式为 pk: target_pk,即 源表主键名称: 目标表主键名称

      说明
      • 多个主键列配置时顺序必须与Tablestore中的主键顺序相同。

      • Tablestore适配器支持配置主键列自增,格式为 $$: target_pk,表示在目标表中生成一列名称为target_pk的主键且该主键列为自增列。当上游数据写入Tablestore时,Tablestore Adapter会自动填充该列的值。

      dbMapping.targetColumns

      modified_time: $string

      配置需要同步的数据列及映射,支持配置类型转换。支持以下4种配置格式:

      • id: target_id$string:表示源表的id字段同步到目标表的target_id字段,字段类型映射为string。

      • id: target_id:表示源表的id字段同步到目标表后的target_id字段,字段类型采用默认映射。

      • id: $string:表示id字段同步前后字段名不变,字段类型映射为string。

      • id: :表示id字段同步前后字段名不变,字段类型采用默认映射。

      重要
      • dbMapping.targetPk中配置的非自增的主键列也需要在此处进行配置,自增主键列不需要进行配置。

      • 如果不配置类型转换,Canal会根据源表中的字段类型推断目标字段类型。更多信息,请参见附录2:MySQL源表和表格存储目标表字段类型映射

      • 配置映射的字段类型时,字段类型大小写不敏感。

      dbMapping.commitBatch

      200

      一次批量RPC请求导入的数据行数,对应于TablestoreWriter中的maxBatchRowsCount。

      dbMapping.etlCondition

      where create_time >= {}

      全量抽取数据时的过滤条件,过滤条件中的字段名称为源表的字段名称,可不填写。

      重要

      只有在同步全量数据命令中带params参数时,过滤条件才会生效,否则即使设置了etlCondition,也会同步全量数据。

      完整示例配置文件内容

      dataSourceKey: defaultDS
      destination: tablestore
      groupId: g1
      outerAdapterKey: orders
      threads: 1
      updateChangeColumns: false
      dbMapping:
        database: ki**at
        table: orders
        targetTable: orders_canal
        targetPk:
          order_id: order_id
        targetColumns:
          order_id:
          user_id:
          sku_id:
          price:
          num:
          total_price:
          order_status:
          create_time: $string
          modified_time: $string
        etlCondition:
        commitBatch: 200 # 批量提交的大小
  4. 启动Adapter服务。

    sh bin/startup.sh

    您可以通过以下命令查看日志,以判断服务是否启动成功。

    tail -f logs/adapter/adapter.log

    启动成功会有如下信息展示。

    2025-02-13 16:16:21.415 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: tablestore-g1 succeed
    2025-02-13 16:16:21.415 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
    2025-02-13 16:16:21.415 [Thread-5] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: tablestore <=============
    2025-02-13 16:16:21.421 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
    2025-02-13 16:16:21.423 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
    2025-02-13 16:16:21.447 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
    2025-02-13 16:16:21.452 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.931 seconds (JVM running for 4.515)
    2025-02-13 16:16:21.521 [Thread-5] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: tablestore succeed <=============

Adapter服务启动成功后,Canal默认自动开始同步MySQL增量数据并定期打印同步日志。

2025-02-13 19:56:06.400 [writer-scheduled-pool-%d0] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
    totalRequestCount=0,
    totalRowsCount=0,
    totalSucceedRowsCount=0,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

如需关闭Adapter服务,您可以执行以下命令。

sh bin/stop.sh

四、验证增量数据同步

  1. MySQL的源表中插入一行数据。

    如果同步成功,adapter.log会打印同步日志如下。

    2025-02-13 16:14:25.512 [writer-scheduled-pool-%d0] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
        totalRequestCount=1,
        totalRowsCount=1,
        totalSucceedRowsCount=1,
        totalFailedRowsCount=0,
        totalSingleRowRequestCount=0,
    }
  2. 表格存储控制台查看数据。

五、(可选)同步全量数据

执行以下命令调用Adapter服务的方法触发同步任务。

说明

执行命令后,Canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,Canal会自动进行增量数据同步。

命令格式:

curl "hostip:port/etl/type/key/task" -X POST

示例:

curl "localhost:8081/etl/tablestore/orders/orders.yml" -X POST

带过滤条件的同步命令示例,调用时会使用params的值替换dbMapping.etlCondition中的 {}

curl "localhost:8081/etl/tablestore/orders/orders.yml?params='2025-01-01'" -X POST

参数说明如下:

配置项

示例值

说明

配置项

示例值

说明

hostip

localhost

部署Canal服务的服务器IP地址,本地运行可设置为localhost。

port

8081

部署Canal服务的端口。

type

tablestore

适配器类型,必须设置为tablestore。

key

orders

适配器标识。与application.yml文件中的canalAdapters.groups.groupId.outerAdapters.key保持一致。

task

orders.yml

配置文件名称。

同步成功后,adapter.log会打印同步日志。

2025-02-13 19:59:40.836 [http-nio-8081-exec-1] INFO  o.a.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring FrameworkServlet 'dispatcherServlet'
2025-02-13 19:59:40.836 [http-nio-8081-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization started
2025-02-13 19:59:40.865 [http-nio-8081-exec-1] INFO  org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization completed in 28 ms
2025-02-13 19:59:40.954 [http-nio-8081-exec-1] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - Start initialize ots writer, table name: orders_canal.
2025-02-13 19:59:41.012 [http-nio-8081-exec-1] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - End initialize with table meta: TableName: orders_canal, PrimaryKeySchema: order_id:STRING, DefinedColumnSchema: .
2025-02-13 19:59:41.017 [writer-scheduled-pool-%d0] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
    totalRequestCount=0,
    totalRowsCount=0,
    totalSucceedRowsCount=0,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}
2025-02-13 19:59:41.149 [pool-8-thread-2] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
    totalRequestCount=0,
    totalRowsCount=133,
    totalSucceedRowsCount=0,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}
2025-02-13 19:59:41.982 [pool-8-thread-1] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
    totalRequestCount=44,
    totalRowsCount=8943,
    totalSucceedRowsCount=6740,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}
2025-02-13 19:59:42.085 [http-nio-8081-exec-1] INFO  c.a.o.c.c.a.tablestore.service.TablestoreEtlService - 数据全量导入完成, 一共导入 10001 条数据, 耗时: 1064
2025-02-13 19:59:42.086 [http-nio-8081-exec-1] INFO  com.alicloud.openservices.tablestore.TableStoreWriter - WriterStatistics: WriterStatistics: {
    totalRequestCount=54,
    totalRowsCount=10001,
    totalSucceedRowsCount=10001,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

附录

附录1:MySQL准备工作

开启binlog

MySQL 8.0RDS MySQL新建实例已默认开启binlog,您可以登录MySQL后通过以下命令确认binlog是否已开启。

show variables like 'log_bin';

如果binlog已开启,会显示如下信息。

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

如果 log_bin 参数的值为 OFF,表示binlog未开启,需要在my.cnf配置文件中添加如下内容开启binlog。

[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置 MySQL replaction 需要定义,不要和 Canal 的 SlaveId 重复

授权MySQL用户具有MySQL Slave的权限

您可以为已有用户授权,也可以登录MySQL后创建一个新用户并进行授权。

  • 创建MySQL用户命令如下。

    create user 'canal' identified by 'password';
  • 授权命令如下。

    grant select, replication slave, replication client on *.* to 'canal'@'%';
  • 授权后,需要刷新权限。

    flush privileges;

附录2:MySQL源表和表格存储目标表字段类型映射

配置MySQL源表和表格存储目标表的字段映射时,如果不手动指定映射后的字段类型,Canal会根据源表字段类型自动判断表格存储目标表的字段类型。字段类型映射关系如下表所示。

源表字段类型

表格存储目标表字段类型

源表字段类型

表格存储目标表字段类型

string

string

int

int

integer

bool

bool

boolean

binary

binary

double

double

float

decimal

附录3:样例数据

MySQL源表

MySQL源表建表语句如下。

create table orders (
  order_id varchar(50) primary key comment '订单ID',
  user_id varchar(10) not null comment '用户ID',
  sku_id varchar(10) not null comment '商品ID',
  price decimal(12, 2) not null comment '商品购买单价',
  num int not null comment '商品购买数量',
  total_price decimal(12, 2) not null comment '订单总价',
  order_status varchar(2) not null comment '订单状态',
  create_time timestamp not null default current_timestamp comment '订单创建时间',
  modified_time timestamp not null default current_timestamp on update current_timestamp comment '最后修改时间'
);

表格存储目标表

由于表格存储是schema-free的,创建目标表时仅需指定主键 order_id 即可。目标表基本详情如下。

image

  • 本页导读 (1)
  • 背景信息
  • 准备工作
  • 操作步骤
  • 一、安装JDK
  • 二、部署Deployer服务
  • 三、部署Adapter服务
  • 四、验证增量数据同步
  • 五、(可选)同步全量数据
  • 附录
  • 附录1:MySQL准备工作
  • 附录2:MySQL源表和表格存储目标表字段类型映射
  • 附录3:样例数据