如果您需要将MySQL中的增量数据实时同步至阿里云Elasticsearch(简称ES)实例或ES Serverless应用,且您对数据同步的实时性要求较高,可以通过Canal来实现。
背景信息
Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。Canal的功能原理及详细说明请参见Canal。使用Canal模拟成MySQL的Slave,实时接收MySQL的增量数据binlog,然后通过RESTful API将数据写入到阿里云ES实例或ES Serverless应用中,适用于对数据同步的实时性要求较高的场景。
前提条件
已创建RDS MySQL实例、阿里云ES实例、阿里云ECS实例。建议您在同一专有网络下创建相关实例。
已创建RDS MySQL实例。具体操作请参见创建RDS MySQL实例。本文以MySQL 5.7版本为例。
已创建阿里云ES实例或ES Serverless应用。具体操作请参见创建阿里云Elasticsearch实例、创建Serverless应用。本文以阿里云ES 6.7内核增强版为例。
说明通过canal将数据写入到ES实例或Serverless应用中,需将阿里云ECS实例的IP地址加入ES实例或Serverless应用中。具体操作,请参见配置ES实例公网或私网访问白名单、配置Serverless应用公网访问白名单。
已创建阿里云ECS实例。用于部署Canal-server和Canal-adapter。具体操作请参见自定义购买实例。本文创建的实例的镜像为CentOS 7.6 64位。
使用限制
本方案仅支持将MySQL增量数据同步至阿里云ES。
安装的JDK版本必须大于等于1.8.0。
Canal 1.1.4版本不支持ES 7.x版本。
ES 7.x版本和ES Serverless应用的数据写入需使用Canal 1.1.5版本,ES 8.x版本请选择1.1.7版本。您也可以通过其他方式(例如Logstash、DTS)实现MySQL数据同步。
在进行数据同步时支持自定义索引Mapping,但需保证Mapping中定义的字段(名称+类型)与MySQL中一致。
本方案需要您自行保证Canal的可用性,避免出现业务不可用或故障。例如:当出现ECS重启,Canal异常退出等场景时如何继续同步数据等。
Canal Adapter不支持使用HTTPS协议连接阿里云ES实例。
操作步骤
步骤一:准备MySQL数据源
进入RDS控制台,创建RDS MySQL数据库和表。具体操作请参见RDS MySQL快速入门。本文使用的建表语句如下。
-- create table
CREATE TABLE `es_test` (
`id` bigint(32) NOT NULL,
`name` text NOT NULL,
`count` text NOT NULL,
`color` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;
步骤二:创建索引
登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台。
说明本文以阿里云ES 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
- 在左侧导航栏,单击Dev Tools。
在Console中,执行以下命令创建索引。
以下示例创建的索引名称为es_test,包含count、id、name和color字段。
重要mappings中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。
PUT es_test?include_type_name=true { "settings" : { "index" : { "number_of_shards" : "5", "number_of_replicas" : "1" } }, "mappings" : { "_doc" : { "properties" : { "count": { "type": "text" }, "id": { "type": "integer" }, "name": { "type" : "text", "analyzer": "ik_smart" }, "color" : { "type" : "text" } } } } }
创建成功后,返回如下结果。
{ "acknowledged" : true, "shards_acknowledged" : true, "index" : "es_test" }
步骤三:安装JDK
连接ECS实例。
具体操作请参见通过密码或密钥认证登录Linux实例。
说明本文档以普通用户权限为例。
查看可用的JDK软件包列表。
sudo yum search java | grep -i --color JDK
选择合适的版本,安装JDK。
本文选择java-1.8.0-openjdk-devel.x86_64。
sudo yum install java-1.8.0-openjdk-devel.x86_64
配置环境变量。
打开etc文件夹下的profile文件。
vim ~/.bash_profile
在文件内添加如下的环境变量。
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin
重要JAVA_HOME需要替换为您JDK的安装路径,可通过
find / -name 'java'
命令查看。按下Esc键,然后使用
:wq
保存文件并退出vi模式,随后执行以下命令使配置生效。source ~/.bash_profile
执行以下命令,验证JDK是否安装成功。
java -version
显示如下结果说明JDK安装成功。
openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
步骤四:安装并启动Canal-server
下载Canal-server。
本文使用1.1.4版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
说明目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note。
下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。
解压。
tar -zxvf canal.deployer-1.1.4.tar.gz
修改
conf/example/instance.properties
文件。vi conf/example/instance.properties
配置项
说明
canal.instance.master.address
需要设置为<RDS MySQL数据库的内网地址>:<内网端口>,相关信息可在RDS MySQL实例的基本信息页面获取。例如rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306。
canal.instance.dbUsername
RDS MySQL数据库的账号名称,可在实例的账号管理页面获取。
canal.instance.dbPassword
RDS MySQL数据库的密码。
按下Esc键,然后使用
:wq
命令保存文件并退出vi模式。启动Canal-server,并查看日志。
./bin/startup.sh cat logs/canal/canal.log
步骤五:安装并启动Canal-adapter
下载Canal-adapter。
本文使用1.1.4版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
说明目前Canal 1.1.5版本已支持ES 7.0版本,如果您使用的是ES 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note。
下载Canal-server和Canal-adapter需要连接公网,请确保ECS实例可以访问公网。
解压。
tar -zxvf canal.adapter-1.1.4.tar.gz
修改
conf/application.yml
文件。vi conf/application.yml
配置项
说明
canal.conf.canalServerHost
canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。
canal.conf.srcDataSources.defaultDS.url
需要设置为jdbc:mysql://<RDS MySQL内网地址>:<内网端口>/<数据库名称>?useUnicode=true,相关信息可在RDS MySQL实例的基本信息页面获取。例如jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true。
canal.conf.srcDataSources.defaultDS.username
RDS MySQL数据库的账号名称,可在RDS MySQL实例的账号管理页面获取。
canal.conf.srcDataSources.defaultDS.password
RDS MySQL数据库的密码。
canal.conf.canalAdapters.groups.outerAdapters.name
ES实例:无需操作
Serverless应用:定位到name:es的位置,将es改为es7。
canal.conf.canalAdapters.groups.outerAdapters.hosts
定位到name:es的位置,将hosts替换为<ES实例或Serverless应用的内网地址>:<内网端口>,相关信息可在ES实例或Serverless应用的基本信息页面获取。例如,es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200或cj-******.public.cn-hangzhou.es-serverless.aliyuncs.com:9200。
canal.conf.canalAdapters.groups.outerAdapters.mode
必须设置为rest。
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth
需要设置为<ES实例的账号或Serverless应用的用户名>:<密码>。例如elastic:es_password或test-t53:password。
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name
ES实例的ID或Serverless的应用名称,可在ES实例或Serverless应用的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp。
按下Esc键,然后使用
:wq
命令保存文件并退出vi模式。同样的方式,修改
conf/es/*.yml
文件,定义MySQL数据到ES数据的映射字段。启动Canal-adapter服务,并查看日志。
./bin/startup.sh cat logs/adapter/adapter.log
说明本文以MySQL 5.7版本为例,如果您使用的是MySQL其它版本,需要将MySQL驱动器替换为相应的版本,否则可能会导致启动失败,详细信息请参见常见问题。
服务启动正常时,结果如下所示。
步骤六:验证增量数据同步
在RDS MySQL数据库中,新增、修改或删除数据库中es_test表的数据。
insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
登录目标阿里云ES实例的Kibana控制台,具体操作请参见登录Kibana控制台。
- 在左侧导航栏,单击Dev Tools。
在Console中,执行以下命令查询同步成功的数据。
GET /es_test/_search
预期结果如下。
重要Canal同步的是增量数据,不会同步之前的存量数据。
常见问题
Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为:java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
A:将canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar替换为canal-1.1.5-alpha-2版本下的对应文件。
您可以参考Canal的issues解决,请参见Canal issues
以root权限用户为例,操作步骤如下:
下载canal-1.1.5-alpha-2版本。详细信息请参见Canal release note。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
解压文件。
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz
拷贝canal-1.1.5-alpha-2 版本中plugin下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar放到canal.adapter-1.1.5\plugin目录下。
说明实际拷贝文件的目录需根据您创建的目录结构来。
cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugin
删除canal.adapter-1.1.5\plugin下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar。
rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jar
修改名字。
mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
Q:启动Canal-adapter时,adapter.log日志显示异常,如何解决?错误日志为java.sql.SQLException: Unknown system variable 'query_cache_size'
A:可能是由于Canal-adapter中自带的MySQL驱动版本与连接的MySQL数据库版本不一致导致,例如:使用的canal.adapter-1.1.4时,其自带的MySQL驱动器为mysql-connector-java-5.1.40.jar,在连接MySQL8时就会出现如上所述的异常信息。可更换Canal-adapter中的MySQL驱动器版本进行解决。
Q:使用Canal同步MySQL 8.0版本的数据,如何将MySQL驱动器版本替换成8.0?
A:以下操作步骤以root权限用户为例。
下载8.0版本的MySQL驱动器。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
解压。
unzip mysql-connector-java-8.0.29.zip
拷贝文件到Canal-adapter的lib目录下。
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/
添加权限。
chmod 777 lib/mysql-connector-java-8.0.29.jar chmod +st lib/mysql-connector-java-8.0.29.jar
删除5.x版本的驱动器。
rm -rf lib/mysql-connector-java-5.1.40.jar