通过Logstash迁移自建Elasticsearch数据

Logstash是Elasticsearch提供的一个开源数据处理工具,可以收集和处理日志数据,并将处理后的数据导入至目标数据库。如果您想要通过Logstash将自建Elasticsearch集群中的数据迁移至Lindorm搜索引擎,可以参考本文的操作步骤配置脚本和迁移任务。

前提条件

  • 自建Elasticsearch(简称ES)集群为7.0.0~7.10.1版本。

  • 已开通搜索引擎。如何开通,请参见开通指南

  • 已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单

    说明

    本文使用的自建ES集群与Logstash服务均部署在阿里云ECS实例,为保证ECS实例与Lindorm实例之间的连通性,您需要将ECS的IP地址添加至Lindorm白名单。如何购买阿里云ECS实例,请参见自定义购买实例

迁移方案选择

您可以根据实际业务需求,结合ES索引的mappings,选择不同的方案进行数据迁移。

  • 全量数据迁移:适用于自建ES集群中的数据源索引没有文档的增、删、改等变化的场景。

  • 增量数据迁移:适用于自建的ES索引存在数据更新或是写入了新数据的场景。该迁移方案要求业务字段中必须包含表示数据更新时间的字段,且业务场景中不存在数据的删除。

  • 综合迁移方案:适用于业务数据中不包含数据更新时间字段,但业务场景存在数据更新(不包含删除)的场景。您需要修改业务代码,新增写入数据更新时间的逻辑,参考综合迁移方案进行数据迁移。

数据准备

本文使用的自建Elasticsearch集群基于阿里云ECS搭建,数据使用Rally提供的geonames数据集作为待迁移的数据,具体的数据导入方式请参见Run a Benchmark: Races。本文示例中Elasticsearch集群的索引(index)名为geonames

数据规模:文档(Document)数量为11,396,503,数据解压后占用空间3.3 GB。

说明

您也可以迁移自建ES集群中已有的数据。如果您需要向自建ES集群中写入数据,请参见Index API

步骤一:安装Logstash服务

安装部署Logstash服务的具体操作,请参见Logstash Reference

步骤二:创建搜索索引

在使用Logstash将Elasticsearch集群数据迁移到Lindorm搜索引擎前,需先在Lindorm中创建好用于存储迁移数据的目标索引。

重要

Logstash服务不会将源索引的settings等信息拷贝到目标索引中,因此,如果您希望目标索引具备与源索引相同的配置或映射规则,则需要在创建目标索引时进行配置,预先指定好settingsmappingsshard counts等参数。

本文使用geonames作为目标索引名称,创建索引时不指定settingsmappings参数。

curl -XPUT "http://<url>/geonames" -u <username>:<password>

参数说明

参数

说明

url

搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址

重要
  • 如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

  • 如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

  • 通过专有网络访问Lindorm实例,url请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,url请填写Elasticsearch兼容地址对应的公网地址。

username

访问搜索引擎的用户名和密码。

默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

password

说明

您可以在创建索引时指定与源索引不同的settingsmappings。但需注意,目标索引的mappings与源索引的mappings设置不能存在冲突,否则可能会导致数据迁移时无法写入目标索引中。更多说明,请参见Create index API

步骤三:数据迁移

全量数据迁移

如果您的自建ES集群中的数据源索引不再有任何变化,即没有文档的增、删、改,则可以通过全量数据迁移的方式将数据源索引中的所有文档迁移至Lindorm搜索引擎。

  1. 创建Logstash配置文件fulldata.conf,用于配置全量数据迁移任务。具体如下:

    input{
      elasticsearch{
        # 源ES地址
        hosts =>  ["http://<host>:<port>"]
        
        # 访问使用的用户名&密码
        user => "changeme"
        password => "changeme"
        
        # 源index名,支持逗号分隔的多个index名和模糊匹配
        index => "geonames"
      
        # 查询所有数据并同步输出源
        query => '{"query":{"match_all":{}}}'
    
        # 默认为false,不会读出_id等元数据信息
        docinfo=>true
      }
    }
    
    filter {
      # 去掉Logstash附加字段
      mutate {
        remove_field => ["@timestamp", "@version"]
      }
    }
    
    output{
      elasticsearch{
        # Lindorm连接地址
        hosts => ["http://<lindorm-address>"]
      
        # 访问使用的用户名&密码
        user => "changeme"
        password => "changeme"
      
        index => "geonames"
        # 指定数据写入时,保持id为原来的值,如无业务需要可以删去
        document_id => "%{[@metadata][_id]}"
    
        # 关闭Logstash自带的模板
        manage_template => false
      }
    }

    参数说明

    配置项

    参数

    说明

    input

    hosts

    自建Elasticsearch集群的IP和端口。

    user

    自建Elasticsearch集群的用户名。非必填项,请根据实际情况填写。

    password

    自建Elasticsearch集群的密码。非必填项,请根据实际情况填写。

    index

    自建Elasticsearch集群中待迁移的索引名。

    query

    迁移数据时使用的查询条件。示例代码中match_all表示查询索引中的所有文档,即将数据源索引中的所有文档都迁移到目标索引中。

    output

    hosts

    搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址

    重要
    • 如果您的Logstash服务部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

    • 如果您的Logstash服务部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

    • 通过专有网络访问Lindorm实例,请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,请填写Elasticsearch兼容地址对应的公网地址。

    user

    访问搜索引擎的用户名和密码。

    默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

    password

    index

    Lindorm搜索引擎中创建的索引名(目标索引)。

  2. 指定fulldata.conf为任务配置文件并启动Logstash服务,等待数据迁移完成。数据迁移完成后,Logstash服务会自动停止。

    cd logstash-7.10.0
    bin/logstash -f <fulldata.conf所在路径>

增量数据迁移

如果您自建的ES索引存在数据更新或写入了新数据,且没有数据被删除,则可以通过增量数据迁移的方式完成数据迁移任务。

建议您基于数据更新时间字段,使用滚动迁移的方式来完成数据迁移任务,在Logstash滚动迁移任务第一次执行时,会将历史数据一并迁移到目的端索引。

本文使用的Logstash增量数据迁移配置文件increment.conf和任务滚动迁移脚本如下。

  • 配置文件increment.conf

    input{
      elasticsearch{
        hosts =>  ["<自建ES集群的连接地址>:<自建ES集群的端口>"]
        user => "<自建ES集群的用户名>"
        password => "<自建ES集群的密码>"
        index => "<自建ES集群的数据源索引>"
        query => '{"query":{"range":{"updateTimestampField":{"gte":"${TMP_LAST}","lt":"${TMP_CURR}"}}}}'
        docinfo=>true
      }
    }
    filter {
      mutate {
        remove_field => ["@timestamp", "@version"]
      }
    }
    output{
      elasticsearch{
        hosts => ["http://<lindorm-address>"]
        user => "changeme"
        password => "changeme"
        index => "geonames"
        document_id => "%{[@metadata][_id]}"
        manage_template => false
      }
    }
  • 任务滚动迁移脚本。

    #!/bin/bash
    
    unset TMP_LAST
    unset TMP_CURR
    
    # logstash执行间隔,默认30s
    sleepInterval=30
    # 设置一个稍大于源端的refresh interval的值,单位s,此处默认源端是15s
    refreshInterval=16
    
    # 默认转化为ms,需要根据业务字段的单位调整
    export TMP_LAST=0
    i=1
    while true
    do
      echo "开始第 ${i} 次logstash迁移数据任务..."
    
      # 默认转化为ms,需要根据业务字段的单位调整
      export TMP_CURR=$((($(date +%s%N)/1000000) - ($refreshInterval * 1000)))
      <path-to-logstash>/bin/logstash -f <path-to-increment.conf>
    
      echo "完成第 ${i} 次logstash迁移数据任务..."
      echo "本次迁移数据的数据更新时间范围:${TMP_LAST} 到 ${TMP_CURR}"
      i=$(( $i + 1 ))
      export TMP_LAST=${TMP_CURR}
      sleep ${sleepInterval}
    done

    以上示例代码在increment.conf配置文件中使用了环境变量,结合滚动迁移脚本,持续迁移一段时间窗口内更新的数据。更多有关在Logstash任务配置中使用环境变量的说明,请参见Using environment variables

    重要

    由于无法保证数据写入请求到达Lindorm搜索引擎服务端的顺序,因此服务端处理请求的顺序也不确定。如果同时执行全量和增量数据迁移任务,可能会出现历史数据将更新后的数据覆盖的情况。因此,建议您先执行历史全量数据迁移任务,待任务执行结束后,再启动增量数据滚动迁移任务。

综合迁移方案

如果您的数据中不包含数据更新时间字段,那么您需要在数据写入的业务代码中添加写入数据更新时间的逻辑。对不包含数据更新时间字段的历史数据,应用历史数据迁移的任务配置,启动Logstash任务来完成历史数据的迁移,对包含数据更新时间字段的增量数据应用滚动迁移的方案

本文使用的Logstash历史数据迁移任务配置文件history.conf如下。

input{
  elasticsearch{
    hosts =>  ["http://<host>:<port>"]
    user => "changeme"
    password => "changeme"
    index => "geonames"
    query => '{"query":{"bool":{"must_not":{"exists":{"field":"updateTimeField"}}}}}'
    docinfo=>true
  }
}
filter {
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output{
  elasticsearch{
    hosts => ["http://<lindorm-address>"]
    user => "changeme"
    password => "changeme"
    index => "geonames"
    document_id => "%{[@metadata][_id]}"
    manage_template => false
  }
}

滚动迁移的示例代码请参见任务滚动迁移脚本

步骤四:检查迁移结果

您可以通过查询ES数据源索引和Lindorm目标索引中文档的数量是否一致,或根据最近一段时间内更新的数据是否一致来判断自建ES集群索引中的历史数据和增量数据是否已全部迁移至Lindorm搜索引擎。示例代码如下:

# 查看index详情
curl -XGET "<url>/_cat/indices?v" -u <username>:<password>

# 查看index近期更新数据
curl -XGET "<url>/<index>/_search" -u <username>:<password> -H'Content-Type:application/json' -d'{
  "query": {
    "bool": {
      "must": {
        "exists": {
          "field": "updateTimestampField"
        }
      }
    }
  },
  "sort": [
    {
      "updateTimestampField": {
        "order": "desc"
      }
    }
  ],
  "size": 20
}'