如果您需要将MySQL中的增量数据实时同步至阿里云Elasticsearch(简称ES)实例或ES Serverless应用,且您对数据同步的实时性要求较高,可以通过Canal来实现。
背景信息
Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。Canal的功能原理及详细说明请参见Canal。使用Canal模拟成MySQL的Slave,实时接收MySQL的增量数据binlog,然后通过RESTful API将数据写入到阿里云ES实例或ES Serverless应用中,适用于对数据同步的实时性要求较高的场景。
前提条件
已创建RDS MySQL实例、阿里云ES实例、阿里云ECS实例。建议您在同一专有网络下创建相关实例。
- 已创建RDS MySQL实例。具体操作请参见创建RDS MySQL实例。本文以MySQL 5.7版本为例。 
- 已创建阿里云ES实例或ES Serverless应用。具体操作请参见创建阿里云Elasticsearch实例、创建Serverless应用。本文以阿里云ES 6.7内核增强版为例。 说明- 通过canal将数据写入到ES实例或Serverless应用中,需将阿里云ECS实例的IP地址加入ES实例或Serverless应用中。具体操作,请参见配置ES实例公网或私网访问白名单、配置Serverless应用公网访问白名单。 
- 已创建阿里云ECS实例。用于部署Canal-server和Canal-adapter。具体操作请参见自定义购买实例。本文创建的实例的镜像为CentOS 7.6 64位。 
使用限制
- 本方案仅支持将MySQL增量数据同步至阿里云ES。 
- 安装的JDK版本必须大于等于1.8.0。 
- Canal 1.1.4版本不支持ES 7.x版本。 - ES 7.x版本和ES Serverless应用的数据写入需使用Canal 1.1.5版本,ES 8.x版本请选择1.1.7版本。您也可以通过其他方式(例如Logstash、DTS)实现MySQL数据同步。 
- 在进行数据同步时支持自定义索引Mapping,但需保证Mapping中定义的字段(名称+类型)与MySQL中一致。 
- 本方案需要您自行保证Canal的可用性,避免出现业务不可用或故障。例如:当出现ECS重启,Canal异常退出等场景时如何继续同步数据等。 
- Canal Adapter不支持使用HTTPS协议连接阿里云ES实例。 
操作步骤
步骤一:准备MySQL数据源
进入RDS控制台,创建RDS MySQL数据库和表。具体操作请参见RDS MySQL快速入门。本文使用的建表语句如下。
-- create table
CREATE TABLE `es_test` (
    `id` bigint(32) NOT NULL,
    `name` text NOT NULL,
    `count` text NOT NULL,
    `color` text NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;步骤二:创建索引
- 登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台。 说明- 本文以阿里云ES 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。 
- 在左侧导航栏,单击Dev Tools。
- 在Console中,执行以下命令创建索引。 - 以下示例创建的索引名称为es_test,包含count、id、name和color字段。 重要- mappings中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。 - PUT es_test?include_type_name=true { "settings" : { "index" : { "number_of_shards" : "5", "number_of_replicas" : "1" } }, "mappings" : { "_doc" : { "properties" : { "count": { "type": "text" }, "id": { "type": "integer" }, "name": { "type" : "text", "analyzer": "ik_smart" }, "color" : { "type" : "text" } } } } }- 创建成功后,返回如下结果。 - { "acknowledged" : true, "shards_acknowledged" : true, "index" : "es_test" }
步骤三:安装JDK
- 连接ECS实例。 - 具体操作请参见通过密码或密钥认证登录Linux实例。 说明- 本文档以普通用户权限为例。 
- 查看可用的JDK软件包列表。 - sudo yum search java | grep -i --color JDK
- 选择合适的版本,安装JDK。 - 本文选择java-1.8.0-openjdk-devel.x86_64。 - sudo yum install java-1.8.0-openjdk-devel.x86_64
- 配置环境变量。 - 打开etc文件夹下的profile文件。 - vim ~/.bash_profile
- 在文件内添加如下的环境变量。 - export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin重要- JAVA_HOME需要替换为您JDK的安装路径,可通过 - find / -name 'java'命令查看。
- 按下Esc键,然后使用 - :wq保存文件并退出vi模式,随后执行以下命令使配置生效。- source ~/.bash_profile
 
- 执行以下命令,验证JDK是否安装成功。 - java -version- 显示如下结果说明JDK安装成功。 - openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
步骤四:安装并启动Canal-server
- 下载Canal-server。 - 本文使用1.1.4版本。 - wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz说明- 目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note。 
- 下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。 
 
- 解压。 - tar -zxvf canal.deployer-1.1.4.tar.gz
- 修改 - conf/example/instance.properties文件。- vi conf/example/instance.properties - 配置项 - 说明 - canal.instance.master.address - 需要设置为<RDS MySQL数据库的内网地址>:<内网端口>,相关信息可在RDS MySQL实例的基本信息页面获取。例如rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306。 - canal.instance.dbUsername - RDS MySQL数据库的账号名称,可在实例的账号管理页面获取。 - canal.instance.dbPassword - RDS MySQL数据库的密码。 
- 按下Esc键,然后使用 - :wq命令保存文件并退出vi模式。
- 启动Canal-server,并查看日志。 - ./bin/startup.sh cat logs/canal/canal.log 
步骤五:安装并启动Canal-adapter
- 下载Canal-adapter。 - 本文使用1.1.4版本。 - wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz说明- 目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note。 
- 下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。 
 
- 解压。 - tar -zxvf canal.adapter-1.1.4.tar.gz
- 修改 - conf/application.yml文件。- vi conf/application.yml - 配置项 - 说明 - canal.conf.canalServerHost - canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。 - canal.conf.srcDataSources.defaultDS.url - 需要设置为jdbc:mysql://<RDS MySQL内网地址>:<内网端口>/<数据库名称>?useUnicode=true,相关信息可在RDS MySQL实例的基本信息页面获取。例如jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true。 - canal.conf.srcDataSources.defaultDS.username - RDS MySQL数据库的账号名称,可在RDS MySQL实例的账号管理页面获取。 - canal.conf.srcDataSources.defaultDS.password - RDS MySQL数据库的密码。 - canal.conf.canalAdapters.groups.outerAdapters.name - ES实例:无需操作 
- Serverless应用:定位到name:es的位置,将es改为es7。 
 - canal.conf.canalAdapters.groups.outerAdapters.hosts - 定位到name:es的位置,将hosts替换为<ES实例或Serverless应用的内网地址>:<内网端口>,相关信息可在ES实例或Serverless应用的基本信息页面获取。例如,es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200或cj-******.public.cn-hangzhou.es-serverless.aliyuncs.com:9200。 - canal.conf.canalAdapters.groups.outerAdapters.mode - 必须设置为rest。 - canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth - 需要设置为<ES实例的账号或Serverless应用的用户名>:<密码>。例如elastic:es_password或test-t53:password。 - canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name - ES实例的ID或Serverless的应用名称,可在ES实例或Serverless应用的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp。 
- 按下Esc键,然后使用 - :wq命令保存文件并退出vi模式。
- 同样的方式,修改 - conf/es/*.yml文件,定义MySQL数据到ES数据的映射字段。
- 启动Canal-adapter服务,并查看日志。 - ./bin/startup.sh cat logs/adapter/adapter.log说明- 本文以MySQL 5.7版本为例,如果您使用的是MySQL其它版本,需要将MySQL驱动器替换为相应的版本,否则可能会导致启动失败,详细信息请参见常见问题。 - 服务启动正常时,结果如下所示。  
步骤六:验证增量数据同步
- 在RDS MySQL数据库中,新增、修改或删除数据库中es_test表的数据。 - insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
- 登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台。 
- 在左侧导航栏,单击Dev Tools。
- 在Console中,执行以下命令查询同步成功的数据。 - GET /es_test/_search- 预期结果如下。  重要 重要- Canal同步的是增量数据,不会同步之前的存量数据。 
常见问题
Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为:java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
A:将canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar替换为canal-1.1.5-alpha-2版本下的对应文件。
您可以参考Canal的issues解决,请参见Canal issues
以root权限用户为例,操作步骤如下:
- 下载canal-1.1.5-alpha-2版本。详细信息请参见Canal release note。 - wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
- 解压文件。 - tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz
- 拷贝canal-1.1.5-alpha-2 版本中plugin下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar放到canal.adapter-1.1.5\plugin目录下。 说明- 实际拷贝文件的目录需根据您创建的目录结构来。 - cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugin
- 删除canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar。 - rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jar
- 修改名字。 - mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为java.sql.SQLException: Unknown system variable 'query_cache_size'
A:可能是由于Canal-adapter中自带的MySQL驱动版本与连接的MySQL数据库版本不一致导致,例如:使用的canal.adapter-1.1.4时,其自带的MySQL驱动器为mysql-connector-java-5.1.40.jar,在连接MySQL8时就会出现如上所述的异常信息。可更换Canal-adapter中的MySQL驱动器版本进行解决。
Q:使用Canal同步MySQL 8.0版本的数据,如何将MySQL驱动器版本替换成8.0?
A:以下操作步骤以root权限用户为例。
- 下载8.0版本的MySQL驱动器。 - wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
- 解压。 - unzip mysql-connector-java-8.0.29.zip
- 拷贝文件到Canal-adapter的lib目录下。 - mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/
- 添加权限。 - chmod 777 lib/mysql-connector-java-8.0.29.jar chmod +st lib/mysql-connector-java-8.0.29.jar
- 删除5.x版本的驱动器。 - rm -rf lib/mysql-connector-java-5.1.40.jar
