Logstash是Elasticsearch提供的一个开源数据处理工具,可以收集和处理日志数据,并将处理后的数据导入至目标数据库。如果您想要通过Logstash将自建Elasticsearch集群中的数据迁移至Lindorm搜索引擎,可以参考本文的操作步骤配置脚本和迁移任务。
前提条件
迁移方案选择
您可以根据实际业务需求,结合ES索引的mappings,选择不同的方案进行数据迁移。
数据准备
本文使用的自建Elasticsearch集群基于阿里云ECS搭建,数据使用Rally提供的geonames数据集作为待迁移的数据,具体的数据导入方式请参见Run a Benchmark: Races。本文示例中Elasticsearch集群的索引(index)名为geonames
。
数据规模:文档(Document)数量为11,396,503,数据解压后占用空间3.3 GB。
您也可以迁移自建ES集群中已有的数据。如果您需要向自建ES集群中写入数据,请参见Index API。
步骤一:安装Logstash服务
安装部署Logstash服务的具体操作,请参见Logstash Reference。
步骤二:创建搜索索引
在使用Logstash将Elasticsearch集群数据迁移到Lindorm搜索引擎前,需先在Lindorm中创建好用于存储迁移数据的目标索引。
Logstash服务不会将源索引的settings等信息拷贝到目标索引中,因此,如果您希望目标索引具备与源索引相同的配置或映射规则,则需要在创建目标索引时进行配置,预先指定好settings,mappings,shard counts等参数。
本文使用geonames
作为目标索引名称,创建索引时不指定settings和mappings参数。
curl -XPUT "http://<url>/geonames" -u <username>:<password>
参数说明
参数 | 说明 |
url | 搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址。 重要
|
username | 访问搜索引擎的用户名和密码。 默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。 |
password |
您可以在创建索引时指定与源索引不同的settings和mappings。但需注意,目标索引的mappings与源索引的mappings设置不能存在冲突,否则可能会导致数据迁移时无法写入目标索引中。更多说明,请参见Create index API。
步骤三:数据迁移
全量数据迁移
如果您的自建ES集群中的数据源索引不再有任何变化,即没有文档的增、删、改,则可以通过全量数据迁移的方式将数据源索引中的所有文档迁移至Lindorm搜索引擎。
创建Logstash配置文件
fulldata.conf
,用于配置全量数据迁移任务。具体如下:input{ elasticsearch{ # 源ES地址 hosts => ["http://<host>:<port>"] # 访问使用的用户名&密码 user => "changeme" password => "changeme" # 源index名,支持逗号分隔的多个index名和模糊匹配 index => "geonames" # 查询所有数据并同步输出源 query => '{"query":{"match_all":{}}}' # 默认为false,不会读出_id等元数据信息 docinfo=>true } } filter { # 去掉Logstash附加字段 mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # Lindorm连接地址 hosts => ["http://<lindorm-address>"] # 访问使用的用户名&密码 user => "changeme" password => "changeme" index => "geonames" # 指定数据写入时,保持id为原来的值,如无业务需要可以删去 document_id => "%{[@metadata][_id]}" # 关闭Logstash自带的模板 manage_template => false } }
参数说明
配置项
参数
说明
input
hosts
自建Elasticsearch集群的IP和端口。
user
自建Elasticsearch集群的用户名。非必填项,请根据实际情况填写。
password
自建Elasticsearch集群的密码。非必填项,请根据实际情况填写。
index
自建Elasticsearch集群中待迁移的索引名。
query
迁移数据时使用的查询条件。示例代码中match_all表示查询索引中的所有文档,即将数据源索引中的所有文档都迁移到目标索引中。
output
hosts
搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址。
重要如果您的Logstash服务部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。
如果您的Logstash服务部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址。
通过专有网络访问Lindorm实例,请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,请填写Elasticsearch兼容地址对应的公网地址。
user
访问搜索引擎的用户名和密码。
默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。
password
index
Lindorm搜索引擎中创建的索引名(目标索引)。
指定
fulldata.conf
为任务配置文件并启动Logstash服务,等待数据迁移完成。数据迁移完成后,Logstash服务会自动停止。cd logstash-7.10.0 bin/logstash -f <fulldata.conf所在路径>
增量数据迁移
如果您自建的ES索引存在数据更新或写入了新数据,且没有数据被删除,则可以通过增量数据迁移的方式完成数据迁移任务。
建议您基于数据更新时间字段,使用滚动迁移的方式来完成数据迁移任务,在Logstash滚动迁移任务第一次执行时,会将历史数据一并迁移到目的端索引。
本文使用的Logstash增量数据迁移配置文件increment.conf
和任务滚动迁移脚本如下。
配置文件
increment.conf
。input{ elasticsearch{ hosts => ["<自建ES集群的连接地址>:<自建ES集群的端口>"] user => "<自建ES集群的用户名>" password => "<自建ES集群的密码>" index => "<自建ES集群的数据源索引>" query => '{"query":{"range":{"updateTimestampField":{"gte":"${TMP_LAST}","lt":"${TMP_CURR}"}}}}' docinfo=>true } } filter { mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ hosts => ["http://<lindorm-address>"] user => "changeme" password => "changeme" index => "geonames" document_id => "%{[@metadata][_id]}" manage_template => false } }
任务滚动迁移脚本。
#!/bin/bash unset TMP_LAST unset TMP_CURR # logstash执行间隔,默认30s sleepInterval=30 # 设置一个稍大于源端的refresh interval的值,单位s,此处默认源端是15s refreshInterval=16 # 默认转化为ms,需要根据业务字段的单位调整 export TMP_LAST=0 i=1 while true do echo "开始第 ${i} 次logstash迁移数据任务..." # 默认转化为ms,需要根据业务字段的单位调整 export TMP_CURR=$((($(date +%s%N)/1000000) - ($refreshInterval * 1000))) <path-to-logstash>/bin/logstash -f <path-to-increment.conf> echo "完成第 ${i} 次logstash迁移数据任务..." echo "本次迁移数据的数据更新时间范围:${TMP_LAST} 到 ${TMP_CURR}" i=$(( $i + 1 )) export TMP_LAST=${TMP_CURR} sleep ${sleepInterval} done
以上示例代码在
increment.conf
配置文件中使用了环境变量,结合滚动迁移脚本,持续迁移一段时间窗口内更新的数据。更多有关在Logstash任务配置中使用环境变量的说明,请参见Using environment variables。重要由于无法保证数据写入请求到达Lindorm搜索引擎服务端的顺序,因此服务端处理请求的顺序也不确定。如果同时执行全量和增量数据迁移任务,可能会出现历史数据将更新后的数据覆盖的情况。因此,建议您先执行历史全量数据迁移任务,待任务执行结束后,再启动增量数据滚动迁移任务。
综合迁移方案
如果您的数据中不包含数据更新时间字段,那么您需要在数据写入的业务代码中添加写入数据更新时间的逻辑。对不包含数据更新时间字段的历史数据,应用历史数据迁移的任务配置,启动Logstash任务来完成历史数据的迁移,对包含数据更新时间字段的增量数据应用滚动迁移的方案。
本文使用的Logstash历史数据迁移任务配置文件history.conf
如下。
input{
elasticsearch{
hosts => ["http://<host>:<port>"]
user => "changeme"
password => "changeme"
index => "geonames"
query => '{"query":{"bool":{"must_not":{"exists":{"field":"updateTimeField"}}}}}'
docinfo=>true
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
hosts => ["http://<lindorm-address>"]
user => "changeme"
password => "changeme"
index => "geonames"
document_id => "%{[@metadata][_id]}"
manage_template => false
}
}
滚动迁移的示例代码请参见任务滚动迁移脚本。
步骤四:检查迁移结果
您可以通过查询ES数据源索引和Lindorm目标索引中文档的数量是否一致,或根据最近一段时间内更新的数据是否一致来判断自建ES集群索引中的历史数据和增量数据是否已全部迁移至Lindorm搜索引擎。示例代码如下:
# 查看index详情
curl -XGET "<url>/_cat/indices?v" -u <username>:<password>
# 查看index近期更新数据
curl -XGET "<url>/<index>/_search" -u <username>:<password> -H'Content-Type:application/json' -d'{
"query": {
"bool": {
"must": {
"exists": {
"field": "updateTimestampField"
}
}
}
},
"sort": [
{
"updateTimestampField": {
"order": "desc"
}
}
],
"size": 20
}'