全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件

自建ES迁移

更新时间:2018-04-12 22:06:34

前提条件

自建Elasticsearch迁移至阿里云Elasticsearch:

  • 自建ES所在ECS必需是VPC网络(不支持 Classiclink 方式打通的ECS),且必须与阿里云ES,处于同一个 VPC 下。
  • 可以使用中控机器(或者任意一台机器)执行该脚本,前提是该机器可以同时访问新老ES集群的 9200 端口。
  • VPC安全组不能限制 IP白名单,并且需要开启 9200 端口。
  • 可以在执行脚本的机器上通过 curl -XGET http://<host>:9200 验证。

执行流程

  1. 索引创建
  2. 数据迁移

索引创建

新集群要提前创建索引,或者新集群可以使用动态创建索引和动态映射(不建议)功能。

附件中提供了python脚本(indiceCreate.py),可以拉取所有的索引并创建,只设置分片数0副本,其余需要单独调整。

  1. // 获取集群所有的索引信息,没有权限的话,可以去掉"-u user:pass",oldClusterHost为老集群的host,注意替换
  2. curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
  3. // 获取某个索引的setting和mapping,注意替换indexName为要查询的索引名
  4. curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
  5. // 创建索引,其中newClusterHost是新集群的host,testindex是要创建的索引名,testtype是要创建的type
  6. curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
  7. "testindex" : {
  8. "settings" : {
  9. "number_of_shards" : "1",
  10. "number_of_replicas" : "1"
  11. }
  12. },
  13. "mappings" : {
  14. "testtype" : {
  15. "properties" : {
  16. "uid" : {
  17. "type" : "long"
  18. },
  19. "name" : {
  20. "type" : "text"
  21. },
  22. "create_time" : {
  23. "type" : "long"
  24. }
  25. }
  26. }
  27. }
  28. }
  29. }'

数据迁移

为保证数据迁移结束后一致性,需要上游业务 停止老集群的写操作,读服务可以正常进行。迁移完毕后,直接切换到新集群进行读写。如不停止写操作可能会存在数据最终的不一致!

注意

  • 使用下述方案迁移时,如果是通过IP + Port访问老集群,则必须在新集群的yml中配置 reindex 白名单(为老集群的 IP 地址):
    reindex.remote.whitelist:1.1.1.1:9200,1.2.*.*:*

  • 如果使用域名访问,则不允许通过 http://host:port/path 这种(带path)形式访问。

1. 数据量小

使用reindex.sh脚本。

  1. #!/bin/bash
  2. # file:reindex.sh
  3. indexName="你的索引名"
  4. newClusterUser="新集群用户名"
  5. newClusterPass="新集群密码"
  6. newClusterHost="新集群host"
  7. oldClusterUser="老集群用户名"
  8. oldClusterPass="老集群密码"
  9. oldClusterHost="老集群host"
  10. curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
  11. "source": {
  12. "remote": {
  13. "host": "'${oldClusterHost}'",
  14. "username": "'${oldClusterUser}'",
  15. "password": "'${oldClusterPass}'"
  16. },
  17. "index": "'${indexName}'",
  18. "query": {
  19. "match_all": {}
  20. }
  21. },
  22. "dest": {
  23. "index": "'${indexName}'"
  24. }
  25. }'

2. 数据量大、无删除操作、有更新时间

数据量较大且无删除操作时,可以使用滚动迁移的方式,减小停止写服务的时间。滚动迁移需要有一个类似于更新时间的字段代表新数据的写时序。可以在数据迁移完成后,再停止写服务,快速更新一次。即可切换到新集群,恢复读写。

  1. #!/bin/bash
  2. # file: circleReindex.sh
  3. # CONTROLLING STARTUP:
  4. # 这是通过reindex操作远程重建索引的脚本。要求:
  5. # 1. 新集群已经创建完索引,或者支持自动创建和动态映射。
  6. # 2. 新集群必须在yml里配置IP白名单 reindex.remote.whitelist:172.16.123.*:9200
  7. # 3. host必须是[scheme]://[host]:[port]
  8. USAGE="Usage: sh circleReindex.sh <count>
  9. count: 执行次数,多次(负数为循环)增量执行或者单次执行
  10. Example:
  11. sh circleReindex.sh 1
  12. sh circleReindex.sh 5
  13. sh circleReindex.sh -1"
  14. indexName="你的索引名"
  15. newClusterUser="新集群用户名"
  16. newClusterPass="新集群密码"
  17. oldClusterUser="老集群用户名"
  18. oldClusterPass="老集群密码"
  19. ## http://myescluster.com
  20. newClusterHost="新集群host"
  21. ## http://10.37.1.1:9200
  22. oldClusterHost="老集群host"
  23. timeField="更新时间字段"
  24. reindexTimes=0
  25. lastTimestamp=0
  26. curTimestamp=`date +%s`
  27. hasError=false
  28. function reIndexOP() {
  29. reindexTimes=$[${reindexTimes} + 1]
  30. curTimestamp=`date +%s`
  31. ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
  32. "source": {
  33. "remote": {
  34. "host": "'${oldClusterHost}'",
  35. "username": "'${oldClusterUser}'",
  36. "password": "'${oldClusterPass}'"
  37. },
  38. "index": "'${indexName}'",
  39. "query": {
  40. "range" : {
  41. "'${timeField}'" : {
  42. "gte" : '${lastTimestamp}',
  43. "lt" : '${curTimestamp}'
  44. }
  45. }
  46. }
  47. },
  48. "dest": {
  49. "index": "'${indexName}'"
  50. }
  51. }'`
  52. lastTimestamp=${curTimestamp}
  53. echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
  54. if [[ ${ret} == *error* ]]; then
  55. hasError=true
  56. echo "本次执行异常,中断后续执行操作~~,请检查"
  57. fi
  58. }
  59. function start() {
  60. ## 负数就不停循环执行
  61. if [[ $1 -lt 0 ]]; then
  62. while :
  63. do
  64. reIndexOP
  65. done
  66. elif [[ $1 -gt 0 ]]; then
  67. k=0
  68. while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
  69. reIndexOP
  70. let ++k
  71. done
  72. fi
  73. }
  74. ## main
  75. if [ $# -lt 1 ]; then
  76. echo "$USAGE"
  77. exit 1
  78. fi
  79. echo "开始执行索引 ${indexName} 的 ReIndex操作"
  80. start $1
  81. echo "总共执行 ${reindexTimes} 次 reIndex 操作"

3. 数据量大、无删除操作、无更新时间

当数据量较大,且索引的 mapping 中没有定义更新时间字段时,需要由上游业务修改代码添加更新时间字段。添加完成后可以先将历史数据迁移完,然后再使用上述的第2种方案。

下面是迁移没有更新时间字段的老数据脚本。

  1. #!/bin/bash
  2. # file:miss.sh
  3. indexName="你的索引名"
  4. newClusterUser="新集群用户名"
  5. newClusterPass="新集群密码"
  6. newClusterHost="新集群host"
  7. oldClusterUser="老集群用户名"
  8. oldClusterPass="老集群密码"
  9. oldClusterHost="老集群host"
  10. timeField="updatetime"
  11. curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
  12. "source": {
  13. "remote": {
  14. "host": "'${oldClusterHost}'",
  15. "username": "'${oldClusterUser}'",
  16. "password": "'${oldClusterPass}'"
  17. },
  18. "index": "'${indexName}'",
  19. "query": {
  20. "bool": {
  21. "must_not": {
  22. "exists": {
  23. "field": "'${timeField}'"
  24. }
  25. }
  26. }
  27. }
  28. },
  29. "dest": {
  30. "index": "'${indexName}'"
  31. }
  32. }'

4. 不停止写服务

敬请期待。

注意

如果单索引数据量比较大,可以在迁移前,将目的索引的副本数设置为0,刷新时间为-1。待数据迁移完成后,再更改回来,这样可以加快数据同步速度。

  1. // 设置副本数为0,不刷新
  2. curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
  3. "number_of_replicas" : 1,
  4. "refresh_interval" : "-1"
  5. }'
  6. // 设置副本数为1,刷新时间1s(1s是默认值)
  7. curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
  8. "number_of_replicas" : 1,
  9. "refresh_interval" : "1s"
  10. }'

批量创建老集群索引

下面是一个在新集群中,批量创建老集群索引的python脚本。该脚本默认新创建的索引副本数为0

  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3. # 文件名:indiceCreate.py
  4. import sys
  5. import base64
  6. import time
  7. import httplib
  8. import json
  9. ## 老集群host(ip+port)
  10. oldClusterHost = "old-cluster.com"
  11. ## 老集群用户名,可为空
  12. oldClusterUserName = "old-username"
  13. ## 老集群密码,可为空
  14. oldClusterPassword = "old-password"
  15. ## 新集群host(ip+port)
  16. newClusterHost = "new-cluster.com"
  17. ## 新集群用户名,可为空
  18. newClusterUser = "new-username"
  19. ## 新集群密码,可为空
  20. newClusterPassword = "new-password"
  21. DEFAULT_REPLICAS = 0
  22. def httpRequest(method, host, endpoint, params="", username="", password=""):
  23. conn = httplib.HTTPConnection(host)
  24. headers = {}
  25. if (username != "") :
  26. 'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
  27. base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
  28. headers["Authorization"] = "Basic %s" % base64string;
  29. if "GET" == method:
  30. headers["Content-Type"] = "application/x-www-form-urlencoded"
  31. conn.request(method=method, url=endpoint, headers=headers)
  32. else :
  33. headers["Content-Type"] = "application/json"
  34. conn.request(method=method, url=endpoint, body=params, headers=headers)
  35. response = conn.getresponse()
  36. res = response.read()
  37. return res
  38. def httpGet(host, endpoint, username="", password=""):
  39. return httpRequest("GET", host, endpoint, "", username, password)
  40. def httpPost(host, endpoint, params, username="", password=""):
  41. return httpRequest("POST", host, endpoint, params, username, password)
  42. def httpPut(host, endpoint, params, username="", password=""):
  43. return httpRequest("PUT", host, endpoint, params, username, password)
  44. def getIndices(host, username="", password=""):
  45. endpoint = "/_cat/indices"
  46. indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
  47. indicesList = indicesResult.split("\n")
  48. indexList = []
  49. for indices in indicesList:
  50. if (indices.find("open") > 0):
  51. indexList.append(indices.split()[2])
  52. return indexList
  53. def getSettings(index, host, username="", password=""):
  54. endpoint = "/" + index + "/_settings"
  55. indexSettings = httpGet(host, endpoint, username, password)
  56. print index + " 原始settings如下:\n" + indexSettings
  57. settingsDict = json.loads(indexSettings)
  58. ## 分片数默认和老集群索引保持一致
  59. number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
  60. ## 副本数默认为0
  61. number_of_replicas = DEFAULT_REPLICAS
  62. newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
  63. return newSetting
  64. def getMapping(index, host, username="", password=""):
  65. endpoint = "/" + index + "/_mapping"
  66. indexMapping = httpGet(host, endpoint, username, password)
  67. print index + " 原始mapping如下:\n" + indexMapping
  68. mappingDict = json.loads(indexMapping)
  69. mappings = json.dumps(mappingDict[index]["mappings"])
  70. newMapping = "\"mappings\" : " + mappings
  71. return newMapping
  72. def createIndexStatement(oldIndexName):
  73. settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
  74. mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
  75. createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
  76. return createstatement
  77. def createIndex(oldIndexName, newIndexName=""):
  78. if (newIndexName == "") :
  79. newIndexName = oldIndexName
  80. createstatement = createIndexStatement(oldIndexName)
  81. print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
  82. endpoint = "/" + newIndexName
  83. createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
  84. print "新索引 " + newIndexName + " 创建结果:" + createResult
  85. ## main
  86. indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
  87. systemIndex = []
  88. for index in indexList:
  89. if (index.startswith(".")):
  90. systemIndex.append(index)
  91. else :
  92. createIndex(index, index)
  93. if (len(systemIndex) > 0) :
  94. for index in systemIndex:
  95. print index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~"

备注

使用 logstash 做数据迁移,请参考 Logstash迁移Elasticsearch数据方法解读

本文导读目录