如果您需要将自建Elasticsearch中的全量或增量数据迁移至阿里云Elasticsearch,可通过在ECS中自建Logstash,并通过Logstash的管道配置功能实现。本文在ECS上部署自建Elasticsearch和自建Logstash,并通过Logstash将自建Elasticsearch中的数据全量迁移至阿里云Elasticsearch。
注意事项
自建Logstash所在的ECS需要与阿里云Elasticsearch集群在同一专有网络下,同时该Logstash需要能够同时访问源Elasticsearch集群(自建)和目标Elasticsearch集群(阿里云)。
数据迁移可以全量迁移或增量迁移。如果业务侧时刻存在写入更新,首次迁移时,需先全量迁移,再通过时间标识字段(或其他可标识增量的字段)进行增量迁移,否则迁移后新数据极易被旧数据覆盖。如果已有全量数据,可以只通过标识字段实现增量数据迁移。
操作流程
开通阿里云Elasticsearch服务,在ECS服务器部署自建Elasticsearch、准备待迁移数据和部署自建Logstash。
在ECS服务器运行Python脚本迁移索引元数据。
通过Logstash管道配置功能,将自建Elasticsearch中的全量数据迁移至阿里云Elasticsearch中。
步骤一:准备环境与实例
创建阿里云Elasticsearch实例。
具体操作请参见创建阿里云Elasticsearch实例。本文使用的测试环境如下。
环境项
环境信息
地域
华东1(杭州)。
版本
通用商业版7.10.0。
实例规格配置
3个可用区、3个数据节点、单节点4核CPU、16 GB内存、100 GB ESSD云盘。
创建ECS实例,用于部署自建Elasticsearch、自建Kibana和自建Logstash。
具体操作请参见自定义购买实例。本文使用的测试环境如下。
环境项
环境信息
地域
华东1(杭州)。
实例规格
4 vCPU 16 GiB内存。
镜像
公共镜像、CentOS 7.9 64位。
存储
系统盘、ESSD云盘、100 GiB。
网络
与阿里云Elasticsearch相同的专有网络,选中分配公网IPv4地址,并按使用流量计费,带宽峰值为100 Mbps。
安全组
入方向添加5601端口(即Kibana端口),在授权对象中添加您客户端的IP地址。
重要如果您的客户端处在家庭网络或公司局域网中,您需要在授权对象中添加局域网的公网出口IP地址,而非客户端机器的IP地址。建议您通过浏览器访问cip.cc查询。
您也可以在授权对象中添加0.0.0.0/0,表示允许所有IPv4地址访问ECS实例。此配置会导致ECS实例完全暴露在公网中,增加安全风险,生产环境尽量避免。
部署自建Elasticsearch。
本文使用的自建Elasticsearch版本为7.6.2,1个数据节点,具体操作步骤如下:
连接ECS服务器。
具体操作,请参见通过密码或密钥认证登录Linux实例。
使用root用户权限创建elastic用户。
useradd elastic
设置elastic用户的密码。
passwd elastic
系统将提示您输入和确认elastic用户的密码。
将root用户切换为elastic用户。
su -l elastic
下载Elasticsearch软件安装包并解压缩。
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
启动Elasticsearch。
进入Elasticsearch的安装目录下,启动服务。
cd elasticsearch-7.6.2 ./bin/elasticsearch -d
验证Elasticsearch服务是否正常运行。
cd ~ curl localhost:9200
正常情况下,返回结果中会显示Elasticsearch版本号和
"You Know, for Search"
。
部署自建Kibana,并准备测试数据。
本文使用的自建Kibana版本为7.6.2,1个数据节点,具体操作步骤如下:
连接ECS服务器。
具体操作请参见通过密码或密钥认证登录Linux实例。
说明本文档以普通用户权限为例。
下载Kibana软件安装包并解压缩。
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
修改Kibana配置文件config/kibana.yml,增加
server.host: "0.0.0.0"
配置,允许通过公网IP访问Kibana。进入Kibana安装目录,修改kibana.yml。
cd kibana-7.6.2-linux-x86_64 vi config/kibana.yml
使用非root用户启用Kibana。
sudo nohup ./bin/kibana &
登录Kibana控制台,添加示例数据。
通过公网IP地址登录Kibana控制台。
公网IP地址为:http://<ECS服务器的公网IP地址>:5601/app/kibana#/home。
在Kibana控制台首页,单击Try our sample data。
在Sample data页签,单击日志示例数据模块下的Add data,添加对应数据。
部署自建Logstash。
本文使用的Logstash版本为7.10.0,1个节点,具体操作步骤如下:
连接ECS服务器。
具体操作请参见通过密码或密钥认证登录Linux实例。
说明本文档以普通用户权限为例。
回到根目录,下载Logstash软件安装包并解压缩。
cd ~ wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
修改Logstash的堆内存使用。
Logstash默认的堆内存为1 GB,您需要根据ECS规格配置合适的内存大小,加快集群数据的迁移效率。
进入Logstash的安装目录下,修改Logstash配置文件config/jvm.options,增加-Xms8g和-Xmx8g。
cd logstash-7.10.0 sudo vi config/jvm.options
修改Logstash批量写入记录条数。
每批量写入5~15 MB数据,可以加快集群数据的迁移效率。
修改Logstash配置文件config/pipelines.yml,将每批量写入记录条数pipeline.batch.size从125改为5000。
vi config/pipelines.yml
验证Logstash功能。
通过控制台输入输出收集数据。
bin/logstash -e 'input { stdin { } } output { stdout {} }'
在控制台中输入"Hello world!"。
正常情况下,控制台会输出"Hello world!"。
(可选)步骤二:迁移索引元数据(设置和映射)
在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。
您可以通过Python脚本创建目标索引,具体操作步骤如下:
连接ECS服务器。
具体操作请参见通过密码或密钥认证登录Linux实例。
说明本文档以普通用户权限为例。
创建并打开Python脚本文件(本文以indiceCreate.py为例)。
sudo vi indiceCreate.py
修改Python脚本文件,拷贝以下代码(注意修改集群的连接地址、用户名和密码)。
#!/usr/bin/python # -*- coding: UTF-8 -*- # 文件名:indiceCreate.py import sys import base64 import time import httplib import json ## 源集群host。 oldClusterHost = "localhost:9200" ## 源集群用户名,可为空。 oldClusterUserName = "elastic" ## 源集群密码,可为空。 oldClusterPassword = "xxxxxx" ## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。 newClusterHost = "es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200" ## 目标集群用户名。 newClusterUser = "elastic" ## 目标集群密码。 newClusterPassword = "xxxxxx" DEFAULT_REPLICAS = 0 def httpRequest(method, host, endpoint, params="", username="", password=""): conn = httplib.HTTPConnection(host) headers = {} if (username != "") : 'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20') base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '') headers["Authorization"] = "Basic %s" % base64string; if "GET" == method: headers["Content-Type"] = "application/x-www-form-urlencoded" conn.request(method=method, url=endpoint, headers=headers) else : headers["Content-Type"] = "application/json" conn.request(method=method, url=endpoint, body=params, headers=headers) response = conn.getresponse() res = response.read() return res def httpGet(host, endpoint, username="", password=""): return httpRequest("GET", host, endpoint, "", username, password) def httpPost(host, endpoint, params, username="", password=""): return httpRequest("POST", host, endpoint, params, username, password) def httpPut(host, endpoint, params, username="", password=""): return httpRequest("PUT", host, endpoint, params, username, password) def getIndices(host, username="", password=""): endpoint = "/_cat/indices" indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: if (indices.find("open") > 0): indexList.append(indices.split()[2]) return indexList def getSettings(index, host, username="", password=""): endpoint = "/" + index + "/_settings" indexSettings = httpGet(host, endpoint, username, password) print (index + " 原始settings如下:\n" + indexSettings) settingsDict = json.loads(indexSettings) ## 分片数默认和源集群索引保持一致。 number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"] ## 副本数默认为0。 number_of_replicas = DEFAULT_REPLICAS newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas) return newSetting def getMapping(index, host, username="", password=""): endpoint = "/" + index + "/_mapping" indexMapping = httpGet(host, endpoint, username, password) print (index + " 原始mapping如下:\n" + indexMapping) mappingDict = json.loads(indexMapping) mappings = json.dumps(mappingDict[index]["mappings"]) newMapping = "\"mappings\" : " + mappings return newMapping def createIndexStatement(oldIndexName): settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}" return createstatement def createIndex(oldIndexName, newIndexName=""): if (newIndexName == "") : newIndexName = oldIndexName createstatement = createIndexStatement(oldIndexName) print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement) endpoint = "/" + newIndexName createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword) print ("新索引 " + newIndexName + " 创建结果:" + createResult) ## main indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword) systemIndex = [] for index in indexList: if (index.startswith(".")): systemIndex.append(index) else : createIndex(index, index) if (len(systemIndex) > 0) : for index in systemIndex: print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
执行Python脚本,创建目标索引。
sudo /usr/bin/python indiceCreate.py
参见登录Kibana控制台,登录目标集群的Kibana控制台,查看已创建的索引。
GET /_cat/indices?v
步骤三:迁移全量数据
连接ECS服务器。
具体操作请参见通过密码或密钥认证登录Linux实例。
在config目录下,创建并打开Logstash配置文件。
cd logstash-7.10.0/config vi es2es_all.conf
参考以下配置,修改Logstash配置文件。
说明8.5版本Logstash的配置参数有所调整,本文同时列出了7.10.0版本和8.5.1版本Logstash的配置示例。
为了保证迁移数据的准确性,建议您创建多个Logstash管道配置文件,分批次迁移数据,每个Logstash迁移部分数据。
7.10.0版本
input{ elasticsearch{ # 源端ES地址。 hosts => ["http://localhost:9200"] # 安全集群配置登录用户名密码。 user => "xxxxxx" password => "xxxxxx" # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。 index => "kibana_sample_data_*" # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。 docinfo=>true slices => 5 size => 5000 } } filter { # 去掉一些Logstash自己加的字段。 mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。 hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码。 user => "elastic" password => "xxxxxx" # 目标端索引名称,以下配置表示索引与源端保持一致。 index => "%{[@metadata][_index]}" # 目标端索引type,以下配置表示索引类型与源端保持一致。 document_type => "%{[@metadata][_type]}" # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。 document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }
8.5.1版本
input{ elasticsearch{ # 源端ES地址。 hosts => ["http://es-cn-uqm3811160002***.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码。 user => "elastic" password => "" # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。 index => "test_ecommerce" # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。 docinfo => true size => 10000 docinfo_target => "[@metadata]" } } filter { # 去掉一些Logstash自己加的字段。 mutate { remove_field => ["@timestamp","@version"] } } output{ elasticsearch{ # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。 hosts => ["http://es-cn-nwy38aixp0001****.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码。 user => "elastic" password => "" # 目标端索引名称,以下配置表示索引与源端保持一致。 index => "%{[@metadata][_index]}" # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。 document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }
Elasticsearch input插件可以根据配置的查询语句,从Elasticsearch集群读取文档数据,适用于批量导入测试日志等操作。默认读取完数据后,同步动作会自动关闭,而阿里云Logstash需保证进程一直运行,关闭后将会重新启动进程,导致某些单一任务场景(如logstash input es)存在重复写数据的情况。设置长时间范围的定时任务可绕过写重复的情况,如每年3月5日13点20分触发任务执行,执行完第一次任务后停止管道运行,可避免重复写情况。可以通过cron语法配合schedule参数实现,详情请参见Logstash官网Scheduling介绍。
例如,设置3月5日13点20分执行任务:
schedule => "20 13 5 3 *"
进入Logstash目录。
cd ~/logstash-7.10.0
启动Logstash全量迁移任务。
nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &
步骤四:迁移增量数据
连接ECS服务器,在config目录下,创建并打开Logstash增量配置文件。
cd config vi es2es_kibana_sample_data_logs.conf
说明本文档以普通用户权限为例。
参考以下配置,修改Logstash配置文件。
7.10.0版本配置示例如下。
说明8.5版本Logstash的配置参数有所调整,需要去掉
document_type => "%{[@metadata][_type]}"
。按如下脚本修改Logstash配置文件后,开启Logstash定时任务即可触发增量迁移。
input{ elasticsearch{ # 源端ES地址。 hosts => ["http://localhost:9200"] # 安全集群配置登录用户名密码。 user => "xxxxxx" password => "xxxxxx" # 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。 index => "kibana_sample_data_logs" # 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。 query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}' # 定时任务,以下配置表示每分钟执行一次。 schedule => "* * * * *" scroll => "5m" docinfo=>true size => 5000 } } filter { # 去掉一些Logstash自己加的字段. mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。 hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码. user => "elastic" password => "xxxxxx" # 目标端索引名称,以下配置表示索引与源端保持一致。 index => "%{[@metadata][_index]}" # 目标端索引type,以下配置表示索引类型与源端保持一致。 document_type => "%{[@metadata][_type]}" # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。 document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }
重要Logstash记录的时间戳为UTC时间,如果您的本地时间为北京时间(东八区),那么两者会存在8个小时的时区差,此时将UTC时间转化为北京时间,可使用公式:UTC+时区差=北京时间。例如,以上示例中通过源端索引中的@timestamp字段进行range范围过滤查询获取增量数据,并在对应的时间上+8h转换为北京时间。
通过Logstash控制时间字段实现增量数据的同步,需确保原索引中有可控制的时间字段,如果原索引中没有时间字段数据,可使用ingest pipeline指定_ingest.timestamp获取元数据值,从而引入@timestamp时间字段。
进入Logstash目录。
cd ~/logstash-7.10.0
启动Logstash增量迁移任务。
sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
在目标端Elasticsearch集群的Kibana中,查询最近更新的记录,验证增量数据是否同步。
以下示例的查询条件为:索引名称为kibana_sample_data_logs、最近时间范围为5分钟。
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }
步骤五:查看数据迁移结果
查看是否完成全量迁移。
查看自建Elasticsearch集群的索引和数据量信息。
GET _cat/indices?v
结果如下。
全量迁移前,查看阿里云Elasticsearch集群的索引和数据量信息。
全量迁移后,查看阿里云Elasticsearch集群索引和数据量信息。
正常情况下,返回的记录条数应该与自建Elasticsearch集群一致。
查看是否完成增量迁移。
查看自建Elasticsearch集群的最近更新记录。
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }
返回结果如下。
增量迁移完成后,使用同样命令查看阿里云Elasticsearch集群最近的更新记录。正常情况下,阿里云Elasticsearch集群的更新记录会与自建集群一致。