ElasticSearch LogStash 迁移

一、迁移原理

Logstash 是一个开放源码的数据收集引擎,它被广泛用于从各种源头(如日志文件、系统指标、网络流量等)收集数据,进行过滤、解析和转换,然后将数据传输到存储和分析工具,如 Elasticsearch、Kibana、Grafana 等。同样的Logstash也支持Elasticsearch作为数据来源。

核心概念

  • 数据收集(INPUTS):从各种源收集数据,包括Elasticsearch、文件、数据库、消息队列等。

  • 数据处理(FILTERS):通过过滤器插件,解析和转换数据。

  • 数据输出(OUTPUTS):将处理后的数据传输到目标系统,如 Elasticsearch、文件、数据库等。

输入插件(Inputs)

输入插件用于从不同的数据源收集数据。目前支持的输入插件种类非常丰富,包括但不限于:

- File: 从文件中读取数据,如日志文件。
- Beats: 接收来自 Beats 代理的数据。
- Kafka: 从 Kafka 消息队列中拉取数据。
- JDBC: 从数据库中拉取数据。
- HTTP: 接收通过 HTTP 请求发送的数据。
- Elasticsearch:从ES中拉取数据。
过滤器插件(Filters)

过滤器插件用于解析和转换数据,以便标准化数据格式,或根据需要进行数据处理。常见的过滤器插件包括:

- Grok: 强大的模式匹配和抽取工具,适用于解析结构化和半结构化文本数据。
- Date: 解析和规范化时间戳。
- Mutate: 修改事件数据(如添加字段、删除字段、替换值等)。
- GeoIP: 根据 IP 地址查找地理位置。
- JSON: 解析 JSON 数据。
输出插件(Outputs)

输出插件用于将处理后的数据传输到多个目标系统。支持的输出插件包括:

- Elasticsearch: 将数据发送到 Elasticsearch,常用于构建日志分析和搜索系统。
- File: 将数据写入文件。
- Kafka: 将数据发送到 Kafka。
- Graphite: 发送指标数据到 Graphite 。
- Database: 将数据写入数据库(如 MySQL、PostgreSQL)。

主要特性

  1. 灵活的输入/输出插件架构:支持多种输入源和输出目标,可以很轻松地进行扩展。

  2. 过滤和变换:丰富的过滤器插件,包括 grok、date、mutate 等,帮助解析和标准化数据。

  3. 实时处理:支持实时数据处理,提供低延迟的流处理。

  4. 轻松扩展:通过插件机制,可以轻松添加自定义输入、过滤和输出插件。

二、迁移流程

在进行数据迁移时,Logstash会自动创建索引,但是自动创建的索引可能与原索引存在差异,导致迁移前后数据的格式不一致。因此在数据迁移前,需要在阿里云Elasticsearch中通过python脚本手动创建目标索引,确保迁移前后索引数据完全一致。

数据迁移可以全量迁移或增量迁移。如果业务侧时刻存在写入更新,首次迁移时,需先全量迁移,再通过时间标识字段(或其他可标识增量的字段)进行增量迁移,否则迁移后新数据极易被旧数据覆盖。

元数据迁移

通过执行python脚本元数据迁移脚本实现目标Elasticsearch环境templatesindices的创建,其中index包含settingsmappingalias。

全量数据迁移

Logstash 的配置文件使用简单且直观的语法。以下是一个全量数据迁移的配置示例,展示了如何从Elasticsearch中读取数据,解析和过滤内容,然后将结果发送到 Elasticsearch:

input{
    elasticsearch{
        # 源端ES地址。
        hosts =>  ["http://<Elasticsearch ECS 内网IP>:9200"]
        # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
        index => "*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
        docinfo=>true
        slices => 5
        size => 5000
    }
}

filter {
  # 去掉一些Logstash自己加的字段。
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
  # 不迁移名称以.开头的系统索引
  if [@metadata][_index] =~ /^\./ {
    drop { }
  }
}

output{
    elasticsearch{
        # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
        hosts => ["http://<你的实例内网访问地址>.elasticsearch.aliyuncs.com:9200"]
        # 安全集群配置登录用户名密码。
        user => "elastic"
        password => "<你的密码>"
        # 目标端索引名称,以下配置表示索引与源端保持一致。
        index => "%{[@metadata][_index]}"
        # 保留原端的document_id(_id),添加此配置会显著的降低迁移性能。
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
}
增量数据迁移

增量数据迁移和全量数据迁移原理是完全一样的,主要区别在与input的数据的选取,以下input配置为例,通过query配置"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}实现查询@timestamp时间戳为5分钟之内的数据,同时通过schedule配置实现每分钟调用一次任务,从而实现了增量同步的效果。

这样实现的缺点也很明显,timestamp字段很重要,如果没有类似timestamp时间戳的字段,或者相关字段更新不及时、不正确,就无法基于更新时间进行增量同步。

input{
    elasticsearch{
        # 源端ES地址。
        hosts =>  ["http://<Elasticsearch ECS 内网IP>:9200"]
        # 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。
        index => "*"
        # 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。
        query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
        # 定时任务,以下配置表示每分钟执行一次。
        schedule => "* * * * *"
        scroll => "5m"
        docinfo=>true
        size => 5000
    }
}

注意:

  • LogstashElasticsearch内部记录的时间戳为UTC时间(例如关键字now),如果本地时间为北京时间(东八区),那么两者会存在8个小时的时区差,此时将UTC时间转化为北京时间,可使用公式:UTC+时区差=北京时间。上述例子中@timestamp填充的时间也是UTC时间,因此可以直接使用,如果实际数据的时间戳是北京时间,那在进行范围查询是需要考虑时间差。

  • 通过Logstash控制时间字段实现增量数据的同步,需确保原索引中有可控制的时间字段,如果原索引中没有时间字段数据,可使用ingest pipeline指定_ingest.timestamp获取元数据值,从而引入@timestamp时间字段。

三、前置条件

  • 由于阿里云Logstash实例部署在专有网络下,如果自建Elasticsearch集群与阿里云Logstash集群在同一专有网络下,可直接配置;如果不在,需要通过配置NAT网关实现与公网的连通,具体操作请参见配置NAT公网数据传输

  • 自建Elasticsearch集群所在的ECS的安全组不能限制Logstash集群的各节点IP(可在基本信息页面查看),并且需要开启9200端口。

四、风险以及注意事项

  • Elasticsearch input插件在执行类似批量导入操作时,默认读取完数据后,同步动作会自动关闭,但是阿里云Logstash需保证进程一直运行,关闭后将会重新启动进程,导致重复进行批量导入写数据的情况。设置长时间范围的定时任务可绕过写重复的情况,如每年351320分触发任务执行,执行完第一次任务后停止管道运行,可避免重复写情况,同时要注意即使删除阿里云上的管道任务。

注意:

  • 集群配置action.auto_create_indextrue时,才能自动创建索引。

  • Elasticsearch迁移的场景下,Logstash主要负责document的迁移,其他对象无法进行迁移,如mapping、setting、alias等。