在使用Logstash传输数据时,在某些业务使用场景中,您可能需要切分源端数据并提取到字段中再写入目标端Elasticsearch集群。例如,源端Logs日志中存在以竖线(|
)分隔的数据,此时您可以通过Logstash按照|
切分数据并提取到字段中,再输出到目标端Elasticsearch集群。本文介绍如何通过Logstash切分数据并提取到字段中。
背景信息
logstash-filter-mutate插件(过滤器插件)支持对事件中的字段进行切分、重命名、删除、替换和修改等操作,详细信息请参见Mutate filter plugin。所有的过滤器插件都支持以下常见的可选配置项,详细信息请参见Common Option。
配置项 | 输入类型 |
---|---|
add_field | hash |
add_tag | array |
enable_metric | boolean |
id | string |
periodic_flush | boolean |
remove_field | array |
remove_tag | array |
前提条件
您已完成以下操作:
- 创建阿里云Elasticsearch实例。
具体操作,请参见创建阿里云Elasticsearch实例,本文以7.10版本实例为例。
- 开启目标Elasticsearch实例的自动创建索引功能。
- 创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。
具体操作,请参见创建阿里云Logstash实例。
- 准备测试数据。本文以Beats采集的Logs中的某一条数据为例,关于Beats采集数据的详细信息请参见采集ECS服务日志。如下测试数据中的LogMessage以
|
分隔,并存在多个||||
特殊符号。本文使用Logstash按照|
切分数据,被切分的字段分别写入到对应的字段中:mobile、appName、type、timestamp、status、code、component、cid、serviceId、serviceName、serviceType、param,最后将字段输出到目标端Elasticsearch集群中。LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||
操作步骤
- 进入阿里云Elasticsearch控制台的Logstash页面。
- 进入目标实例。
- 在顶部菜单栏处,选择地域。
- 在Logstash实例中单击目标实例ID。
- 在左侧导航栏,单击管道管理。
- 单击创建管道。
- 在创建管道任务页面,输入管道ID并配置管道。本文使用的管道配置如下。
input { beats { port => 8001 } } filter { mutate { gsub => ["message","\|","| "] split => ["message","|"] add_field => { "mobile" => "%{[message][1]}" "appName" => "%{[message][2]}" "type" => "%{[message][3]}" "timestamp" => "%{[message][4]}" "status" => "%{[message][5]}" "code" => "%{[message][6]}" "component" => "%{[message][7]}" "cid" => "%{[message][8]}" "serviceId" => "%{[message][9]}" "serviceName" => "%{[message][10]}" "serviceType" => "%{[message][11]}" "param" => "%{[message][12]}" } } mutate { strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"] } } output { elasticsearch { index => "<yourIndexName>" hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "<yourPassword>" } }
重要- input.beat.port为Beats采集日志输入到当前Logstash管道的端口,需要使用8000~9000范围内的端口。
- 以上管道配置中的
gsub => ["message","\|","| "]
,第二个|
后有一个空格。 - 以上管道配置中的index、hosts和password参数值需要替换为您实际业务的索引名称、Elasticsearch集群的访问地址和集群的elastic账号对应的密码。
以上管道配置的原理说明如下:- 通过Logstash的filter.mutate.gsub参数,使用正则表达式
\|
去匹配LogMessage中的|
,将|
替换为|
(即|
+空格)。替换后的效果如下。LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
- 通过filter.mutate.split参数将LogMessage按照
|
进行切分。 - 通过filter.mutate.add_field参数添加字段,即将切分后的LogMessage一一添加到对应的字段中。添加后的效果如下。
"mobile":" 1390000****", "appName":" jop", "type":" byORP", "timestamp":" 2022-04-18T14:18:16.633", "status":" /log/cms/sen", "code":" 200", "component":" pluginNums=0,pluginStatus=0", "cid":" ", "serviceId":" ", "serviceName":" ", "serviceType":" ", "param":" "
- 通过filter.mutate.strip参数去除字段空格。由于添加后的每个字段前面都有一个空格,因此需要去除这些空格。
更多管道配置说明,请参见通过配置文件管理管道和Logstash配置文件说明。
警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。 - 单击保存或者保存并部署。
- 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
- 保存并部署:保存并且部署后,会触发实例重启,使配置生效。
验证结果
- 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。登录Kibana控制台的具体操作,请参见登录Kibana控制台。说明 本文以阿里云Elasticsearch 7.10.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
- 单击右上角的Dev tools。
- 在Console中,执行以下脚本,查询目标索引中的信息。
GET <yourIndexName>/_search { "query": { "match_all": {} } }
说明 <yourIndexName>需要与管道配置中的index参数值保持一致。预期结果如下。
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "<yourIndexName>", "_type" : "_doc", "_id" : "Lb1UWoAB-6Zo6en4luDi", "_score" : 1.0, "_source" : { "mobile" : "1390000****", "appName" : "jop", "type" : "byORP", "timestamp" : "2022-04-18T14:18:16.633", "status" : "/log/cms/sen", "code" : "200", "component" : "pluginNums=0,pluginStatus=0", "cid" : "", "serviceId" : "", "serviceName" : "", "serviceType" : "", "param" : "" } } ] } }