文档

通过Logstash迁移自建Elasticsearch数据

更新时间:

Logstash是Elasticsearch提供的一个开源数据处理工具,可以收集和处理日志数据,并将处理后的数据导入至目标数据库。如果您想要通过Logstash将自建Elasticsearch集群中的数据迁移至Lindorm搜索引擎,可以参考本文的操作步骤配置脚本和迁移任务。

前提条件

  • 自建Elasticsearch(简称ES)集群为7.0.0~7.10.1版本。

  • 已开通搜索引擎(Elasticsearch兼容版)。如何开通,请参见开通指南(Elasticsearch兼容版本)

  • 已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单

    说明

    本文使用的自建ES集群与Logstash服务均部署在阿里云ECS实例,为保证ECS实例与Lindorm实例之间的连通性,您需要将ECS的IP地址添加至Lindorm白名单。如何购买阿里云ECS实例,请参见自定义购买实例

数据准备

本文使用的自建Elasticsearch集群基于阿里云ECS搭建,数据使用Rally提供的geonames数据集作为待迁移的数据,具体的数据导入方式请参见Run a Benchmark: Races。本文示例中Elasticsearch集群的索引(index)名为geonames

数据规模:文档(Document)数量为11,396,503,数据解压后占用空间3.3 GB。

说明

您也可以迁移自建ES集群中已有的数据。如果您需要向自建ES集群中写入数据,请参见Index API

步骤一:安装Logstash服务

安装部署Logstash服务的具体操作,请参见Logstash Reference

说明

因版本兼容性问题,目前暂不支持通过阿里云Logstash来进行数据迁移。如果您的自建Logstash服务在访问Lindorm搜索引擎时抛出异常,且异常信息为The interface you are trying to access is restricted.,请联系Lindorm技术支持(钉钉号:s0s3eg3)。

步骤二:创建搜索索引

在使用Logstash将Elasticsearch集群数据迁移到Lindorm搜索引擎前,需先在Lindorm中创建好用于存储迁移数据的目标索引。

重要

Logstash服务不会将源索引的settings等信息拷贝到目标索引中,因此,如果您希望目标索引具备与源索引相同的配置或映射规则,则需要在创建目标索引时进行配置,预先指定好settingsmappingsshard counts等参数。

本文使用geonames作为目标索引名称,创建索引时不指定settingsmappings参数。

curl -XPUT "http://<url>/geonames" -u <username>:<password>

参数说明

参数

说明

url

搜索引擎(Elasticsearch兼容版本)的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容版本

重要
  • 如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

  • 如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

  • 通过专有网络访问Lindorm实例,url请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,url请填写Elasticsearch兼容地址对应的公网地址。

username

访问搜索引擎(Elasticsearch兼容版本)的用户名和密码。

默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

password

说明

您可以在创建索引时指定与源索引不同的settingsmappings。但需注意,目标索引的mappings与源索引的mappings设置不能存在冲突,否则可能会导致数据迁移时无法写入目标索引中。更多说明,请参见Create index API

步骤三:数据迁移

全量数据迁移

如果您的自建ES集群中的数据源索引不再有任何变化,即没有文档的增、删、改,则可以通过全量数据迁移的方式将数据源索引中的所有文档迁移至Lindorm搜索引擎。

  1. 创建Logstash配置文件fulldata.conf,用于配置全量数据迁移任务。具体如下:

    input{
      elasticsearch{
        # 源ES地址
        hosts =>  ["http://<host>:<port>"]
        
        # 访问使用的用户名&密码
        user => "changeme"
        password => "changeme"
        
        # 源index名,支持逗号分隔的多个index名和模糊匹配
        index => "geonames"
      
        # 查询所有数据并同步输出源
        query => '{"query":{"match_all":{}}}'
    
        # 默认为false,不会读出_id等元数据信息
        docinfo=>true
      }
    }
    
    filter {
      # 去掉Logstash附加字段
      mutate {
        remove_field => ["@timestamp", "@version"]
      }
    }
    
    output{
      elasticsearch{
        # Lindorm连接地址
        hosts => ["http://<lindorm-address>"]
      
        # 访问使用的用户名&密码
        user => "changeme"
        password => "changeme"
      
        index => "geonames"
        # 指定数据写入时,保持id为原来的值,如无业务需要可以删去
        document_id => "%{[@metadata][_id]}"
    
        # 关闭Logstash自带的模板
        manage_template => false
      }
    }

    参数说明

    配置项

    参数

    说明

    input

    hosts

    自建Elasticsearch集群的IP和端口。

    user

    自建Elasticsearch集群的用户名。非必填项,请根据实际情况填写。

    password

    自建Elasticsearch集群的密码。非必填项,请根据实际情况填写。

    index

    自建Elasticsearch集群中待迁移的索引名。

    query

    迁移数据时使用的查询条件。示例代码中match_all表示查询索引中的所有文档,即将数据源索引中的所有文档都迁移到目标索引中。

    output

    hosts

    搜索引擎(Elasticsearch兼容版本)的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容版本

    重要
    • 如果您的Logstash服务部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。

    • 如果您的Logstash服务部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址

    • 通过专有网络访问Lindorm实例,请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,请填写Elasticsearch兼容地址对应的公网地址。

    user

    访问搜索引擎(Elasticsearch兼容版本)的用户名和密码。

    默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。

    password

    index

    Lindorm搜索引擎中创建的索引名(目标索引)。

  2. 指定fulldata.conf为任务配置文件并启动Logstash服务,等待数据迁移完成。数据迁移完成后,Logstash服务会自动停止。

    cd logstash-7.10.0
    bin/logstash -f <fulldata.conf所在路径>

增量数据迁移

如果您自建的ES索引存在数据更新或写入了新数据,且没有数据被删除,则可以通过增量数据迁移的方式完成数据迁移任务。您可以根据ES索引的mapping参数设置,选择不同的方案进行数据迁移。

  • 如果您的数据中不包含数据更新时间字段,那么您需要在您数据写入的业务代码中添加写入数据更新时间的逻辑。对不包含数据更新时间字段的历史数据应用全量数据迁移方案,对包含数据更新时间字段的增量数据应用滚动迁移的方案。本文使用的Logstash增量数据迁移任务配置文件history.conf如下:

    input{
      elasticsearch{
        hosts =>  ["http://<host>:<port>"]
        user => "changeme"
        password => "changeme"
        index => "geonames"
        query => '{"query":{"bool":{"must_not":{"exists":{"field":"updateTimeField"}}}}}'
        docinfo=>true
      }
    }
    filter {
      mutate {
        remove_field => ["@timestamp", "@version"]
      }
    }
    output{
      elasticsearch{
        hosts => ["http://<lindorm-address>"]
        user => "changeme"
        password => "changeme"
        index => "geonames"
        document_id => "%{[@metadata][_id]}"
        manage_template => false
      }
    }
  • 如果您的数据包含数据更新时间字段,则可以基于数据更新时间字段,使用滚动迁移的方式来完成数据迁移任务。本文使用的Logstash增量数据迁移配置文件increment.conf和任务滚动迁移脚本如下。

    • 配置文件increment.conf

      input{
        elasticsearch{
          hosts =>  ["<自建ES集群的连接地址>:<自建ES集群的端口>"]
          user => "<自建ES集群的用户名>"
          password => "<自建ES集群的密码>"
          index => "<自建ES集群的数据源索引>"
          query => '{"query":{"range":{"updateTimestampField":{"gte":"${TMP_LAST}","lt":"${TMP_CURR}"}}}}'
          docinfo=>true
        }
      }
      filter {
        mutate {
          remove_field => ["@timestamp", "@version"]
        }
      }
      output{
        elasticsearch{
          hosts => ["http://<lindorm-address>"]
          user => "changeme"
          password => "changeme"
          index => "geonames"
          document_id => "%{[@metadata][_id]}"
          manage_template => false
        }
      }
    • 任务滚动迁移脚本。

      #!/bin/bash
      
      unset TMP_LAST
      unset TMP_CURR
      
      # 滚动迁移执行间隔,默认60s
      sleepInterval=60
      
      # 默认转化为毫秒(ms),需要根据业务字段的单位进行调整
      export TMP_LAST=0
      i=1
      while true
      do
        echo "开始第 ${i} 次Logstash迁移数据任务..."
      
        # 默认转化为毫秒(ms),需要根据业务字段的单位进行调整
        export TMP_CURR=$(($(date +%s%N)/1000000))
        <path-to-logstash>/bin/logstash -f <path-to-increment.conf>
      
        echo "完成第 ${i} 次Logstash迁移数据任务..."
        echo "本次迁移数据的数据更新时间范围:${TMP_LAST} 到 ${TMP_CURR}"
        i=$(( $i + 1 ))
        export TMP_LAST=${TMP_CURR}
        sleep ${sleepInterval}
      done

      以上示例代码在increment.conf配置文件中使用了环境变量,结合滚动迁移脚本,持续迁移一段时间窗口内更新的数据。更多有关在Logstash任务配置中使用环境变量的说明,请参见Using environment variables

      重要

      由于无法保证数据写入请求到达Lindorm搜索引擎服务端的顺序,因此服务端处理请求的顺序也不确定。如果同时执行全量和增量数据迁移任务,可能会出现历史数据将更新后的数据覆盖的情况。因此,建议您先执行历史全量数据迁移任务,待任务执行结束后,再启动增量数据滚动迁移任务。

步骤四:检查迁移结果

您可以通过查询ES数据源索引和Lindorm目标索引中文档的数量是否一致,或根据最近一段时间内更新的数据是否一致来判断自建ES集群索引中的历史数据和增量数据是否已全部迁移至Lindorm搜索引擎。示例代码如下:

# 查看index详情
curl -XGET "<url>/_cat/indices?v" -u <username>:<password>

# 查看index近期更新数据
curl -XGET "<url>/<index>/_search" -u <username>:<password> -H'Content-Type:application/json' -d'{
  "query": {
    "bool": {
      "must": {
        "exists": {
          "field": "updateTimestampField"
        }
      }
    }
  },
  "sort": [
    {
      "updateTimestampField": {
        "order": "desc"
      }
    }
  ],
  "size": 20
}'
  • 本页导读 (1)
文档反馈