一、迁移原理
Logstash 是一个开放源码的数据收集引擎,它被广泛用于从各种源头(如日志文件、系统指标、网络流量等)收集数据,进行过滤、解析和转换,然后将数据传输到存储和分析工具,如 Elasticsearch、Kibana、Grafana 等。同样的Logstash也支持Elasticsearch作为数据来源。
核心概念
数据收集(INPUTS):从各种源收集数据,包括Elasticsearch、文件、数据库、消息队列等。
数据处理(FILTERS):通过过滤器插件,解析和转换数据。
数据输出(OUTPUTS):将处理后的数据传输到目标系统,如 Elasticsearch、文件、数据库等。
输入插件(Inputs)
输入插件用于从不同的数据源收集数据。目前支持的输入插件种类非常丰富,包括但不限于:
- File: 从文件中读取数据,如日志文件。
- Beats: 接收来自 Beats 代理的数据。
- Kafka: 从 Kafka 消息队列中拉取数据。
- JDBC: 从数据库中拉取数据。
- HTTP: 接收通过 HTTP 请求发送的数据。
- Elasticsearch:从ES中拉取数据。
过滤器插件(Filters)
过滤器插件用于解析和转换数据,以便标准化数据格式,或根据需要进行数据处理。常见的过滤器插件包括:
- Grok: 强大的模式匹配和抽取工具,适用于解析结构化和半结构化文本数据。
- Date: 解析和规范化时间戳。
- Mutate: 修改事件数据(如添加字段、删除字段、替换值等)。
- GeoIP: 根据 IP 地址查找地理位置。
- JSON: 解析 JSON 数据。
输出插件(Outputs)
输出插件用于将处理后的数据传输到多个目标系统。支持的输出插件包括:
- Elasticsearch: 将数据发送到 Elasticsearch,常用于构建日志分析和搜索系统。
- File: 将数据写入文件。
- Kafka: 将数据发送到 Kafka。
- Graphite: 发送指标数据到 Graphite 。
- Database: 将数据写入数据库(如 MySQL、PostgreSQL)。
主要特性
灵活的输入/输出插件架构:支持多种输入源和输出目标,可以很轻松地进行扩展。
过滤和变换:丰富的过滤器插件,包括 grok、date、mutate 等,帮助解析和标准化数据。
实时处理:支持实时数据处理,提供低延迟的流处理。
轻松扩展:通过插件机制,可以轻松添加自定义输入、过滤和输出插件。
二、迁移流程
在进行数据迁移时,Logstash会自动创建索引,但是自动创建的索引可能与原索引存在差异,导致迁移前后数据的格式不一致。因此在数据迁移前,需要在阿里云Elasticsearch中通过python脚本手动创建目标索引,确保迁移前后索引数据完全一致。
数据迁移可以全量迁移或增量迁移。如果业务侧时刻存在写入更新,首次迁移时,需先全量迁移,再通过时间标识字段(或其他可标识增量的字段)进行增量迁移,否则迁移后新数据极易被旧数据覆盖。
元数据迁移
通过执行python脚本元数据迁移脚本实现目标Elasticsearch环境templates和indices的创建,其中index包含settings和mapping和alias。
全量数据迁移
Logstash 的配置文件使用简单且直观的语法。以下是一个全量数据迁移的配置示例,展示了如何从Elasticsearch中读取数据,解析和过滤内容,然后将结果发送到 Elasticsearch:
input{
elasticsearch{
# 源端ES地址。
hosts => ["http://<Elasticsearch ECS 内网IP>:9200"]
# 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
index => "*"
# 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
docinfo=>true
slices => 5
size => 5000
}
}
filter {
# 去掉一些Logstash自己加的字段。
mutate {
remove_field => ["@timestamp", "@version"]
}
# 不迁移名称以.开头的系统索引
if [@metadata][_index] =~ /^\./ {
drop { }
}
}
output{
elasticsearch{
# 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
hosts => ["http://<你的实例内网访问地址>.elasticsearch.aliyuncs.com:9200"]
# 安全集群配置登录用户名密码。
user => "elastic"
password => "<你的密码>"
# 目标端索引名称,以下配置表示索引与源端保持一致。
index => "%{[@metadata][_index]}"
# 保留原端的document_id(_id),添加此配置会显著的降低迁移性能。
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
}
增量数据迁移
增量数据迁移和全量数据迁移原理是完全一样的,主要区别在与input的数据的选取,以下input配置为例,通过query配置"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}实现查询@timestamp时间戳为5分钟之内的数据,同时通过schedule配置实现每分钟调用一次任务,从而实现了增量同步的效果。
这样实现的缺点也很明显,timestamp字段很重要,如果没有类似timestamp时间戳的字段,或者相关字段更新不及时、不正确,就无法基于更新时间进行增量同步。
input{
elasticsearch{
# 源端ES地址。
hosts => ["http://<Elasticsearch ECS 内网IP>:9200"]
# 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。
index => "*"
# 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。
query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
# 定时任务,以下配置表示每分钟执行一次。
schedule => "* * * * *"
scroll => "5m"
docinfo=>true
size => 5000
}
}
注意:
Logstash和Elasticsearch内部记录的时间戳为UTC时间(例如关键字now),如果本地时间为北京时间(东八区),那么两者会存在8个小时的时区差,此时将UTC时间转化为北京时间,可使用公式:UTC+时区差=北京时间。上述例子中@timestamp填充的时间也是UTC时间,因此可以直接使用,如果实际数据的时间戳是北京时间,那在进行范围查询是需要考虑时间差。
通过Logstash控制时间字段实现增量数据的同步,需确保原索引中有可控制的时间字段,如果原索引中没有时间字段数据,可使用ingest pipeline指定_ingest.timestamp获取元数据值,从而引入@timestamp时间字段。
三、前置条件
由于阿里云Logstash实例部署在专有网络下,如果自建Elasticsearch集群与阿里云Logstash集群在同一专有网络下,可直接配置;如果不在,需要通过配置NAT网关实现与公网的连通,具体操作请参见配置NAT公网数据传输。
自建Elasticsearch集群所在的ECS的安全组不能限制Logstash集群的各节点IP(可在基本信息页面查看),并且需要开启9200端口。
四、风险以及注意事项
Elasticsearch input插件在执行类似批量导入操作时,默认读取完数据后,同步动作会自动关闭,但是阿里云Logstash需保证进程一直运行,关闭后将会重新启动进程,导致重复进行批量导入写数据的情况。设置长时间范围的定时任务可绕过写重复的情况,如每年3月5日13点20分触发任务执行,执行完第一次任务后停止管道运行,可避免重复写情况,同时要注意即使删除阿里云上的管道任务。
注意:
集群配置action.auto_create_index为true时,才能自动创建索引。
在Elasticsearch迁移的场景下,Logstash主要负责document的迁移,其他对象无法进行迁移,如mapping、setting、alias等。