通过Reindex API迁移自建Elasticsearch数据

Reindex API是Elasticsearch提供的一种用于将数据从原有的索引迁移至目标索引的接口。如果您需要迁移数据,且源索引的数据不需要进行复杂的转换和处理,您可以按照本文提供的步骤,使用Reindex API将数据从自建Elasticsearch集群迁移至Lindorm搜索引擎。

前提条件

  • 自建Elasticsearch(简称ES)集群为7.0.0~7.10.1版本。

  • 已开通搜索引擎。如何开通,请参见开通指南

  • 如果自建Elasticsearch集群中启用了安全能力,那么需要保证具备以下安全权限:

    • 对数据源(data stream,index或index alias)具备读index权限。

    • 对数据目的(data stream,index或index alias)具备写index权限。

  • 已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单

    重要

    本文以自建ES集群部署在阿里云ECS实例为例。您需要注意以下内容:

    • 为保证ECS实例与Lindorm实例之间的连通性,您需要将ECS的IP地址添加至Lindorm白名单。如何购买阿里云ECS实例,请参见自定义购买实例

    • 阿里云ECS实例与Lindorm实例需部署在同一个专有网络VPC下,且ECS安全组中已添加相关规则允许Lindorm实例访问。具体操作,请参见添加安全组规则。如果您需要跨网络进行数据迁移,请参见通过Logstash迁移自建Elasticsearch数据

迁移方案选择

您可以根据实际业务需求,结合ES索引的mappings,选择不同的方案进行数据迁移。

  • 全量数据迁移:适用于自建ES集群中的数据源索引没有文档的增、删、改等变化的场景。

  • 增量数据迁移:适用于自建的ES索引存在数据更新或是写入了新数据的场景。该迁移方案要求业务字段中必须包含表示数据更新时间的字段,且业务场景中不存在数据的删除

  • 综合迁移方案(全量+增量迁移):适用于业务数据中不包含数据更新时间字段,但业务场景存在数据更新(不包含删除)的场景。您需要修改业务代码,新增写入数据更新时间的逻辑,参考综合迁移方案进行数据迁移。

数据准备

说明

本文使用的自建ES集群没有历史数据,因此以新导入的geonames数据集作为历史数据,介绍数据迁移步骤。如果您的自建ES集群已存在数据,可跳过该步骤,直接创建目标索引。

本文使用Rally提供的geonames数据集作为待迁移的数据,具体的数据导入方式请参考Run a Benchmark: Races,本文示例中的索引(index)名均为geonames

数据规模:文档(Document)数量11,396,503,数据解压后占用空间3.3 GB。

步骤一:创建索引

说明

您也可以迁移自建ES集群中已有的数据。如果您需要向自建ES集群中写入数据,请参见Index API

在通过Reindex API迁移自建ES集群中的数据到Lindorm搜索引擎之前,需要先在Lindorm搜索引擎上创建好作为导入目标的索引。

本文使用geonames作为索引的名称,创建索引时不指定settingsmappings参数。创建语句如下:

curl -XPUT "http://<url>/geonames" -u <username>:<password>

参数说明

参数

说明

url

搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址

重要
  • 如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

  • 如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

  • 通过专有网络访问Lindorm实例,url请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,url请填写Elasticsearch兼容地址对应的公网地址。

username

访问搜索引擎的用户名和密码。

默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

password

步骤二:迁移数据

全量数据迁移

如果您的自建ES集群中的数据源索引不再有任何变化,即没有文档的增、删、改,则可以通过全量数据迁移的方式将数据源索引中的所有文档迁移至Lindorm搜索引擎。通过在Lindorm搜索引擎上执行Reindex请求,即可触发全量数据迁移任务。

curl -XPOST "http://<Lindorm搜索引擎的连接地址>/_reindex" -u <username>:<password> -H'Content-Type:application/json' -d'{
  "source": {
    "remote": {
      "host": "http://<host>:<port>",
      "username": "changeme",
      "password": "changeme"
    },
    "index": "geonames",
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "geonames"
  }
}'

参数说明

配置项

参数

说明

source

host

自建ES集群的IP和端口。

username

自建ES集群的用户名。非必填项,请根据实际情况填写。

password

自建ES集群的密码。非必填项,请根据实际情况填写。

index

自建ES集群中创建的索引名(数据源索引)。可以指定自建ES集群中的data stream或index alias,也可以指定多个索引名称表示从多个索引中迁移数据。

说明

指定多个索引名称时,需使用半角逗号(,)分隔。

query

match_all表示查询索引中的所有文档,即将数据源索引中的所有文档都迁移到目标索引中。

dest

index

Lindorm搜索引擎中创建的索引名(目标索引)。

如果您需要异步执行Reindex请求,可以指定wait_for_completion参数的值为false,如下所示:

curl -XPOST "http://<Lindorm搜索引擎的连接地址>/_reindex?wait_for_completion=false" -u <username>:<password>

Lindorm搜索引擎将返回一个字符串,表示Reindex请求处理对应的任务ID,您可以通过Task management API查看任务的执行情况。更多Reindex API使用详情,请参见Reindex API

增量数据迁移

如果您自建的ES索引存在数据更新或是写入了新数据,且没有数据被删除,则可以通过增量数据迁移的方式完成数据迁移任务。

建议您基于数据更新时间字段,仅使用滚动迁移的方式来完成数据迁移任务,在滚动迁移任务第一次执行时,会将历史数据一并迁移至Lindorm搜索引擎中。

滚动迁移示例代码如下:

#!/bin/bash
# 滚动迁移脚本

# Reindex执行间隔,默认30s
sleepInterval=30
# 设置一个稍大于源端的refresh interval的值,单位s,此处默认源端是15s
refreshInterval=16

# 默认转化为ms,需要根据业务字段的单位调整
lastReindexFinish=0
i=1
while true
do
  echo "开始第 ${i} 次Reindex任务..."
  # 排除数据写入源端,refresh之前查不到的影响
  # 默认转化为ms,需要根据业务字段的单位调整
  currReindexStart=$((($(date +%s%N)/1000000) - (${refreshInterval} * 1000)))
  curl -XPOST "http://<目的Lindorm搜索连接地址>/_reindex" -u <目的Lindorm搜索用户名>:<目的Lindorm搜索密码> -H'Content-Type: application/json' -d'{
    "source": {
      "remote": {
        "host": "<源ES集群连接地址>",
        "username": "<源ES集群用户名>",
        "password": "<源ES集群密码>"
      },
      "index": "<源ES集群数据源索引>",
      "query": {
        "range" : {
          "<数据更新时间字段名>" : {
            "lt" : '${currReindexStart}',
            "gte" : '${lastReindexFinish}'
          }
        }
      }
    },
    "dest": {
      "index": "<目的Lindorm搜索数据目的索引>"
    }
  }'
  echo "完成第 ${i} 次Reindex任务..."
  echo "本次迁移数据的数据更新时间范围:${lastReindexFinish} 到 ${currReindexStart}"
  i=$(( $i + 1 ))
  lastReindexFinish=${currReindexStart}
  sleep ${sleepInterval}
done

综合迁移方案

如果您的历史数据中不包含数据更新时间字段,那么您需要在您数据写入的业务代码中添加写入数据更新时间的逻辑。对不包含数据更新时间字段的历史数据,使用历史数据迁移脚本进行迁移,对包含数据更新时间字段的增量数据使用滚动迁移方案进行迁移。

当存在数据更新时,建议在历史数据迁移结束后,再启动增量数据迁移,避免通过增量数据迁移写入的新版本数据被历史数据迁移写入的老版本数据覆盖。

历史数据迁移脚本示例如下:

#!/bin/bash
# 历史数据全量迁移脚本

# wait_for_completion可以修改为false,然后通过task management api查看进度
curl -XPOST "http://<Lindorm搜索引擎的连接地址>/_reindex?wait_for_completion=true&pretty" -u <Lindorm搜索引擎的用户名>:<Lindorm搜索引擎的密码> -H'Content-Type:application/json' -d'{
  "source": {
    "remote": {
      "host": "<自建ES集群的连接地址>",
      "username": "<自建ES集群的用户名>",
      "password": "<自建ES集群的密码>"
    },
    "index": "<自建ES集群的数据源索引>",
    "query": {
      "bool": {
        "must_not": {
          "exists": {
            "field": "<数据更新时间字段名>"
          }
        }
      }
    }
  },
  "dest": {
      "index": "<Lindorm搜索引擎中创建的目标索引>"
  }
}'

滚动迁移的示例代码请参见滚动迁移示例代码

步骤三:检查迁移结果

您可以通过查询ES数据源索引和Lindorm目标索引中文档的数量是否一致,或根据最近一段时间内更新的数据是否一致来判断自建ES集群索引中的历史数据和增量数据是否已全部迁移至Lindorm搜索引擎。本文使用的数据查询请求如下。

# 查看index详情
curl -XGET "<url>/_cat/indices?v" -u <username>:<password>

# 查看index近期更新数据
curl -XGET "<url>/<index>/_search" -u <username>:<password> -H'Content-Type:application/json' -d'{
  "query": {
    "bool": {
      "must": {
        "exists": {
          "field": "updateTimestampField"
        }
      }
    }
  },
  "sort": [
    {
      "updateTimestampField": {
        "order": "desc"
      }
    }
  ],
  "size": 20
}'