通过Canal将MySQL数据同步到阿里云ES

如果您需要将MySQL中的增量数据实时同步至阿里云Elasticsearch(简称ES)实例或ES Serverless应用,且您对数据同步的实时性要求较高,可以通过Canal来实现。

背景信息

Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。Canal的功能原理及详细说明请参见Canal。使用Canal模拟成MySQL的Slave,实时接收MySQL的增量数据binlog,然后通过RESTful API将数据写入到阿里云ES实例或ES Serverless应用中,适用于对数据同步的实时性要求较高的场景。

前提条件

已创建RDS MySQL实例、阿里云ES实例、阿里云ECS实例。建议您在同一专有网络下创建相关实例。

使用限制

  • 本方案仅支持将MySQL增量数据同步至阿里云ES。

  • 安装的JDK版本必须大于等于1.8.0。

  • Canal 1.1.4版本不支持ES 7.x版本。

    ES 7.x版本和ES Serverless应用的数据写入需使用Canal 1.1.5版本,ES 8.x版本请选择1.1.7版本。您也可以通过其他方式(例如Logstash、DTS)实现MySQL数据同步。

  • 在进行数据同步时支持自定义索引Mapping,但需保证Mapping中定义的字段(名称+类型)与MySQL中一致。

  • 本方案需要您自行保证Canal的可用性,避免出现业务不可用或故障。例如:当出现ECS重启,Canal异常退出等场景时如何继续同步数据等。

  • Canal Adapter不支持使用HTTPS协议连接阿里云ES实例。

操作步骤

步骤一:准备MySQL数据源

进入RDS控制台,创建RDS MySQL数据库和表。具体操作请参见RDS MySQL快速入门。本文使用的建表语句如下。

-- create table
CREATE TABLE `es_test` (
    `id` bigint(32) NOT NULL,
    `name` text NOT NULL,
    `count` text NOT NULL,
    `color` text NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;

步骤二:创建索引

  1. 登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台

    说明

    本文以阿里云ES 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。

  2. 在左侧导航栏,单击Dev Tools
  3. Console中,执行以下命令创建索引。

    以下示例创建的索引名称为es_test,包含count、id、name和color字段。

    重要

    mappings中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。

    PUT es_test?include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "integer"
                   },
                   "name": {
                        "type" : "text",
                        "analyzer": "ik_smart"                   
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }

    创建成功后,返回如下结果。

    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

步骤三:安装JDK

  1. 连接ECS实例。

    具体操作请参见通过密码或密钥认证登录Linux实例

    说明

    本文档以普通用户权限为例。

  2. 查看可用的JDK软件包列表。

    sudo yum search java | grep -i --color JDK
  3. 选择合适的版本,安装JDK。

    本文选择java-1.8.0-openjdk-devel.x86_64

    sudo yum install java-1.8.0-openjdk-devel.x86_64
  4. 配置环境变量。

    1. 打开etc文件夹下的profile文件。

      vim ~/.bash_profile
    2. 在文件内添加如下的环境变量。

      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
      重要

      JAVA_HOME需要替换为您JDK的安装路径,可通过find / -name 'java'命令查看。

    3. 按下Esc键,然后使用:wq保存文件并退出vi模式,随后执行以下命令使配置生效。

      source ~/.bash_profile
  5. 执行以下命令,验证JDK是否安装成功。

    java -version

    显示如下结果说明JDK安装成功。

    openjdk version "1.8.0_362"
    OpenJDK Runtime Environment (build 1.8.0_362-b08)
    OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)

步骤四:安装并启动Canal-server

  1. 下载Canal-server。

    本文使用1.1.4版本。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    说明
    • 目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note

    • 下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。

  2. 解压。

    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. 修改conf/example/instance.properties文件。

    vi conf/example/instance.properties

    修改conf/example/instance.properties文件

    配置项

    说明

    canal.instance.master.address

    需要设置为<RDS MySQL数据库的内网地址>:<内网端口>,相关信息可在RDS MySQL实例的基本信息页面获取。例如rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306。

    canal.instance.dbUsername

    RDS MySQL数据库的账号名称,可在实例的账号管理页面获取。

    canal.instance.dbPassword

    RDS MySQL数据库的密码。

  4. 按下Esc键,然后使用:wq命令保存文件并退出vi模式。

  5. 启动Canal-server,并查看日志。

    ./bin/startup.sh
    cat logs/canal/canal.log

    启动canal-server

步骤五:安装并启动Canal-adapter

  1. 下载Canal-adapter。

    本文使用1.1.4版本。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    说明
    • 目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note

    • 下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。

  2. 解压。

    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. 修改conf/application.yml文件。

    vi conf/application.yml

    image..png

    配置项

    说明

    canal.conf.canalServerHost

    canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。

    canal.conf.srcDataSources.defaultDS.url

    需要设置为jdbc:mysql://<RDS MySQL内网地址>:<内网端口>/<数据库名称>?useUnicode=true,相关信息可在RDS MySQL实例的基本信息页面获取。例如jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true。

    canal.conf.srcDataSources.defaultDS.username

    RDS MySQL数据库的账号名称,可在RDS MySQL实例的账号管理页面获取。

    canal.conf.srcDataSources.defaultDS.password

    RDS MySQL数据库的密码。

    canal.conf.canalAdapters.groups.outerAdapters.name

    • ES实例:无需操作

    • Serverless应用:定位到name:es的位置,将es改为es7。

    canal.conf.canalAdapters.groups.outerAdapters.hosts

    定位到name:es的位置,将hosts替换为<ES实例或Serverless应用的内网地址>:<内网端口>,相关信息可在ES实例或Serverless应用的基本信息页面获取。例如,es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200或cj-******.public.cn-hangzhou.es-serverless.aliyuncs.com:9200

    canal.conf.canalAdapters.groups.outerAdapters.mode

    必须设置为rest。

    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth

    需要设置为<ES实例的账号或Serverless应用的用户名>:<密码>。例如elastic:es_password或test-t53:password

    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name

    ES实例的ID或Serverless的应用名称,可在ES实例或Serverless应用的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp。

  4. 按下Esc键,然后使用:wq命令保存文件并退出vi模式。

  5. 同样的方式,修改conf/es/*.yml文件,定义MySQL数据到ES数据的映射字段。

    修改conf/es/*.yml文件

    配置项

    说明

    esMapping._index

    步骤二:创建索引章节中,在ES实例中所创建的索引的名称。本文使用es_test

    esMapping._type

    步骤二:创建索引章节中,在ES实例中所创建的索引的类型。本文使用_doc

    esMapping._id

    需要同步到ES实例的文档的id,可自定义。本文使用_id

    esMapping.sql

    SQL语句,用来查询需要同步到ES中的字段。本文使用select t.id as _id,t.id,t.count,t.name,t.color from es_test t

  6. 启动Canal-adapter服务,并查看日志。

    ./bin/startup.sh
    cat logs/adapter/adapter.log
    说明

    本文以MySQL 5.7版本为例,如果您使用的是MySQL其它版本,需要将MySQL驱动器替换为相应的版本,否则可能会导致启动失败,详细信息请参见常见问题

    服务启动正常时,结果如下所示。Canal-adapter服务日志

步骤六:验证增量数据同步

  1. 在RDS MySQL数据库中,新增、修改或删除数据库中es_test表的数据。

    insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. 登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台

  3. 在左侧导航栏,单击Dev Tools
  4. Console中,执行以下命令查询同步成功的数据。

    GET /es_test/_search

    预期结果如下。数据同步成功结果

    重要

    Canal同步的是增量数据,不会同步之前的存量数据。

常见问题

Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为:java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]

A:将canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar替换为canal-1.1.5-alpha-2版本下的对应文件。

说明

您可以参考Canal的issues解决,请参见Canal issues

以root权限用户为例,操作步骤如下:

  1. 下载canal-1.1.5-alpha-2版本。详细信息请参见Canal release note

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
  2. 解压文件。

    tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz
  3. 拷贝canal-1.1.5-alpha-2 版本中plugin下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar放到canal.adapter-1.1.5\plugin目录下。

    说明

    实际拷贝文件的目录需根据您创建的目录结构来。

    cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugin
  4. 删除canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar。

    rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jar
  5. 修改名字。

    mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar 

Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为java.sql.SQLException: Unknown system variable 'query_cache_size'

A:可能是由于Canal-adapter中自带的MySQL驱动版本与连接的MySQL数据库版本不一致导致,例如:使用的canal.adapter-1.1.4时,其自带的MySQL驱动器为mysql-connector-java-5.1.40.jar,在连接MySQL8时就会出现如上所述的异常信息。可更换Canal-adapter中的MySQL驱动器版本进行解决。

Q:使用Canal同步MySQL 8.0版本的数据,如何将MySQL驱动器版本替换成8.0?

A:以下操作步骤以root权限用户为例。

  1. 下载8.0版本的MySQL驱动器。

    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
  2. 解压。

    unzip mysql-connector-java-8.0.29.zip
  3. 拷贝文件到Canal-adapter的lib目录下。

    mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/
  4. 添加权限。

    chmod 777 lib/mysql-connector-java-8.0.29.jar
    chmod +st lib/mysql-connector-java-8.0.29.jar
  5. 删除5.x版本的驱动器。

    rm -rf lib/mysql-connector-java-5.1.40.jar

相关文档