当您需要将自建Elasticsearch中的全量或增量数据迁移至阿里云Elasticsearch时,可通过在ECS中自建Logstash,并通过Logstash的管道配置功能实现。本文为您介绍具体的实现方法。
背景信息

- 在ECS服务器部署自建Elasticsearch并准备待迁移的数据。
- 开通阿里云Elasticsearch服务。
- 在ECS服务器运行Python脚本迁移索引元数据。
- 部署Logstash,并通过Logstash管道配置功能,将自建Elasticsearch中的全量或增量数据迁移至阿里云Elasticsearch中。
注意事项
- 本文在阿里云ECS上部署自建Logstash,该Logstash所在的ECS需要与阿里云Elasticsearch集群在同一专有网络下,同时该Logstash需要能够同时访问源Elasticsearch集群(自建)和目标Elasticsearch集群(阿里云)。
- 数据迁移可以全量迁移或增量迁移,首次迁移都是全量迁移,后续写入数据选择增量迁移,增量迁移需要索引有时间戳字段。
操作流程
步骤一:准备环境与实例
- 创建阿里云Elasticsearch实例。具体操作请参见创建阿里云Elasticsearch实例。本文使用的测试环境如下。
环境项 环境信息 地域 华东1(杭州)。 版本 通用商业版7.10.0。 实例规格配置 3个可用区、3个数据节点、单节点4核CPU、16 GB内存、100 GB ESSD云盘。 - 创建ECS实例,用于部署自建Elasticsearch、自建Kibana和自建Logstash。具体操作请参见使用向导创建实例。本文使用的测试环境如下。
环境项 环境信息 地域 华东1(杭州)。 实例规格 4 vCPU 16 GiB内存。 镜像 公共镜像、CentOS、7.9 64位。 存储 系统盘、ESSD云盘、100 GiB。 网络 与阿里云Elasticsearch相同的专有网络,选中分配公网IPv4地址,并按使用流量计费,带宽峰值为100 Mbps。 安全组 入方向添加5601端口(即Kibana端口),在授权对象中添加您客户端的IP地址。 重要- 如果您的客户端处在家庭网络或公司局域网中,您需要在授权对象中添加局域网的公网出口IP地址,而非客户端机器的IP地址。建议您通过浏览器访问https://myip.ipip.net查询。
- 您也可以在授权对象中添加0.0.0.0/0,表示允许所有IPv4地址访问ECS实例。此配置会导致ECS实例完全暴露在公网中,增加安全风险,生产环境尽量避免。
- 部署自建Elasticsearch。本文使用的自建Elasticsearch版本为7.6.2,1个数据节点,具体操作步骤如下:
- 连接ECS服务器。具体操作请参见通过密码或密钥认证登录Linux实例。
- 创建elastic用户。
useradd elastic passwd <your_password>
- root用户切换为elastic用户。
su -l elastic
- 下载Elasticsearch软件安装包并解压缩。
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
- 使用非root用户启动Elasticsearch。进入Elasticsearch的安装目录下,启动服务。
cd elasticsearch-7.6.2 ./bin/elasticsearch -d
- 验证Elasticsearch服务是否正常运行。
cd ~ curl localhost:9200
正常情况下,返回结果中会显示Elasticsearch版本号和"You Know, for Search"。
- 连接ECS服务器。
- 部署自建Kibana,并准备测试数据。本文使用的自建Kibana版本为7.6.2,1个数据节点,具体操作步骤如下:
- 连接ECS服务器。具体操作请参见通过密码或密钥认证登录Linux实例。
- 下载Kibana软件安装包并解压缩。
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
- 修改Kibana配置文件config/kibana.yml,增加
server.host: "0.0.0.0"
配置,允许通过公网IP访问Kibana。进入Kibana安装目录,修改kibana.yml。cd kibana-7.6.2-linux-x86_64 vi config/kibana.yml
- 使用非root用户启用Kibana。
nohup ./bin/kibana &
- 登录Kibana控制台,添加示例数据。
- 通过公网IP地址登录Kibana控制台。
公网IP地址为:http://<ECS服务器的公网IP地址>:5601/app/kibana#/home。
- 在Kibana控制台首页,单击Try our sample data。
- 在Sample data页签,单击日志示例数据模块下的Add data,添加对应数据。
- 通过公网IP地址登录Kibana控制台。
- 连接ECS服务器。
- 部署自建Logstash。本文使用的Logstash版本为7.10.0,1个节点,具体操作步骤如下:
- 连接ECS服务器。具体操作请参见通过密码或密钥认证登录Linux实例。
- 回到跟目录,下载Logstash软件安装包并解压缩。
cd ~ wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
- 修改Logstash的堆内存使用。Logstash默认的堆内存为1 GB,您需要根据ECS规格配置合适的内存大小,加快集群数据的迁移效率。进入Logstash的安装目录下,修改Logstash配置文件config/jvm.options,增加-Xms8g和-Xmx8g。
cd logstash-7.10.0 vi config/jvm.options
- 修改Logstash批量写入记录条数。每批量写入5~15 MB数据,可以加快集群数据的迁移效率。
修改Logstash配置文件config/pipelines.yml,将每批量写入记录条数pipeline.batch.size从125改为5000。
vi config/pipelines.yml
- 验证Logstash功能。
- 通过控制台输入输出收集数据。
bin/logstash -e 'input { stdin { } } output { stdout {} }'
- 在控制台中输入"Hello world!"。
正常情况下,控制台会输出"Hello world!"。
- 通过控制台输入输出收集数据。
- 连接ECS服务器。
步骤二:迁移索引元数据(设置和映射)
在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。
您可以通过Python脚本创建目标索引,具体操作步骤如下:
- 连接ECS服务器。具体操作请参见通过密码或密钥认证登录Linux实例。
- 创建并打开Python脚本文件(本文以indiceCreate.py为例)。
vi indiceCreate.py
- 修改Python脚本文件,拷贝以下代码(注意修改集群的连接地址、用户名和密码)。
#!/usr/bin/python # -*- coding: UTF-8 -*- # 文件名:indiceCreate.py import sys import base64 import time import httplib import json ## 源集群host。 oldClusterHost = "localhost:9200" ## 源集群用户名,可为空。 oldClusterUserName = "elastic" ## 源集群密码,可为空。 oldClusterPassword = "xxxxxx" ## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。 newClusterHost = "es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200" ## 目标集群用户名。 newClusterUser = "elastic" ## 目标集群密码。 newClusterPassword = "xxxxxx" DEFAULT_REPLICAS = 0 def httpRequest(method, host, endpoint, params="", username="", password=""): conn = httplib.HTTPConnection(host) headers = {} if (username != "") : 'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20') base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '') headers["Authorization"] = "Basic %s" % base64string; if "GET" == method: headers["Content-Type"] = "application/x-www-form-urlencoded" conn.request(method=method, url=endpoint, headers=headers) else : headers["Content-Type"] = "application/json" conn.request(method=method, url=endpoint, body=params, headers=headers) response = conn.getresponse() res = response.read() return res def httpGet(host, endpoint, username="", password=""): return httpRequest("GET", host, endpoint, "", username, password) def httpPost(host, endpoint, params, username="", password=""): return httpRequest("POST", host, endpoint, params, username, password) def httpPut(host, endpoint, params, username="", password=""): return httpRequest("PUT", host, endpoint, params, username, password) def getIndices(host, username="", password=""): endpoint = "/_cat/indices" indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: if (indices.find("open") > 0): indexList.append(indices.split()[2]) return indexList def getSettings(index, host, username="", password=""): endpoint = "/" + index + "/_settings" indexSettings = httpGet(host, endpoint, username, password) print (index + " 原始settings如下:\n" + indexSettings) settingsDict = json.loads(indexSettings) ## 分片数默认和源集群索引保持一致。 number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"] ## 副本数默认为0。 number_of_replicas = DEFAULT_REPLICAS newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas) return newSetting def getMapping(index, host, username="", password=""): endpoint = "/" + index + "/_mapping" indexMapping = httpGet(host, endpoint, username, password) print (index + " 原始mapping如下:\n" + indexMapping) mappingDict = json.loads(indexMapping) mappings = json.dumps(mappingDict[index]["mappings"]) newMapping = "\"mappings\" : " + mappings return newMapping def createIndexStatement(oldIndexName): settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}" return createstatement def createIndex(oldIndexName, newIndexName=""): if (newIndexName == "") : newIndexName = oldIndexName createstatement = createIndexStatement(oldIndexName) print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement) endpoint = "/" + newIndexName createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword) print ("新索引 " + newIndexName + " 创建结果:" + createResult) ## main indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword) systemIndex = [] for index in indexList: if (index.startswith(".")): systemIndex.append(index) else : createIndex(index, index) if (len(systemIndex) > 0) : for index in systemIndex: print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
- 执行Python脚本,创建目标索引。
/usr/bin/python indiceCreate.py
- 参见登录Kibana控制台,登录目标集群的Kibana控制台,查看已创建的索引。
GET /_cat/indices?v
步骤三:迁移全量数据
- 连接ECS服务器。具体操作请参见通过密码或密钥认证登录Linux实例。
- 在config目录下,创建并打开Logstash配置文件。
cd logstash-7.10.0/config vi es2es_all.conf
- 参考以下配置,修改Logstash配置文件。
input{ elasticsearch{ # 源端ES地址。 hosts => ["http://localhost:9200"] # 安全集群配置登录用户名密码。 user => "xxxxxx" password => "xxxxxx" # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。 index => "kibana_sample_data_*" # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。 docinfo=>true slices => 5 size => 5000 } } filter { # 去掉一些Logstash自己加的字段。 mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。 hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码。 user => "elastic" password => "xxxxxx" # 目标端索引名称,以下配置表示索引与源端保持一致。 index => "%{[@metadata][_index]}" # 目标端索引type,以下配置表示索引类型与源端保持一致。 document_type => "%{[@metadata][_type]}" # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。 document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }
说明 为了保证迁移数据的准确性,建议您创建多个Logstash管道配置文件,分批次迁移数据,每个Logstash迁移部分数据。 - 启动Logstash全量迁移任务。
cd ../ nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &
步骤四:迁移增量数据
- 连接ECS服务器,在config目录下,创建并打开Logstash增量配置文件。
cd config vi es2es_kibana_sample_data_logs.conf
- 参考以下配置,修改Logstash配置文件。开启Logstash定时任务即可触发增量迁移,配置示例如下。
input{ elasticsearch{ # 源端ES地址。 hosts => ["http://localhost:9200"] # 安全集群配置登录用户名密码。 user => "xxxxxx" password => "xxxxxx" # 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。 index => "kibana_sample_data_logs" # 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。 query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}' # 定时任务,以下配置表示每分钟执行一次。 schedule => "* * * * *" scroll => "5m" docinfo=>true size => 5000 } } filter { # 去掉一些Logstash自己加的字段. mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。 hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"] # 安全集群配置登录用户名密码. user => "elastic" password => "xxxxxx" # 目标端索引名称,以下配置表示索引与源端保持一致。 index => "%{[@metadata][_index]}" # 目标端索引type,以下配置表示索引类型与源端保持一致。 document_type => "%{[@metadata][_type]}" # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。 document_id => "%{[@metadata][_id]}" ilm_enabled => false manage_template => false } }
重要- Logstash记录的时间戳为UTC时间,如果您的本地时间为北京时间(东八区),那么两者会存在8个小时的时区差,此时将UTC时间转化为北京时间,可使用公式:UTC+时区差=北京时间。例如,以上示例中通过源端索引中的@timestamp字段进行range范围过滤查询获取增量数据,并在对应的时间上+8h转换为北京时间。
- 通过Logstash控制时间字段实现增量数据的同步,需确保原索引中有可控制的时间字段,如果原索引中没有时间字段数据,可使用ingest pipeline指定_ingest.timestamp获取元数据值,从而引入@timestamp时间字段。
- 启动Logstash增量迁移任务。
nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
- 在目标端Elasticsearch集群的Kibana中,查询最近更新的记录,验证增量数据是否同步。以下示例的查询条件为:索引名称为kibana_sample_data_logs、最近时间范围为5分钟。
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }
步骤五:查看数据迁移结果
- 查看是否完成全量迁移。
- 查看自建Elasticsearch集群的索引和数据量信息。
GET _cat/indices?v
结果如下。 - 全量迁移前,查看阿里云Elasticsearch集群的索引和数据量信息。
- 全量迁移后,查看阿里云Elasticsearch集群索引和数据量信息。正常情况下,返回的记录条数应该与自建Elasticsearch集群一致。
- 查看自建Elasticsearch集群的索引和数据量信息。
- 查看是否完成增量迁移。查看自建Elasticsearch集群的最近更新记录。
GET kibana_sample_data_logs/_search { "query": { "range": { "@timestamp": { "gte": "now-5m", "lte": "now/m" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] }
返回结果如下。增量迁移完成后,使用同样命令查看阿里云Elasticsearch集群最近的更新记录。正常情况下,阿里云Elasticsearch集群的更新记录会与自建集群一致。