本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
在使用Logstash传输数据时,如果您需要通过合并字段来整合数据,例如将a字段和b字段合并为一个新的c字段,然后移除a字段和b字段,可以通过logstash-filter-mutate插件的多个模块实现。此插件为系统默认安装插件,无须再安装,且不支持卸载。本文介绍如何通过logstash-filter-mutate插件实现多字段合并。
背景信息
logstash-filter-mutate插件支持对事件中的字段进行重命名、删除、替换和修改操作。配置文件中的mutate按照下表中的顺序执行,详细信息请参见Mutate filter plugin。
模块 | 输入类型 |
coerce | hash |
rename | hash |
update | hash |
replace | hash |
convert | hash |
gsub | array |
uppercase | array |
capitalize | array |
lowercase | array |
strip | array |
remove_field | array |
split | hash |
join | hash |
merge | hash |
copy | hash |
前提条件
创建阿里云Elasticsearch实例。
具体操作,请参见创建阿里云Elasticsearch实例,本文以7.10版本实例为例。
开启目标Elasticsearch实例的自动创建索引功能。
创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。
具体操作,请参见创建阿里云Logstash实例。
在源Elasticsearch实例中准备测试数据。
本文使用的测试数据如下。其中源索引的名称为yc_text,待合并的字段为app.name和message。
{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "yc_text", "_type" : "_doc", "_id" : "HpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app1", "annual_rate" : "31%", "describe" : "可以自助选择消息推送", "message" : "10000" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "H5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app2", "annual_rate" : "35%", "describe" : "每天收益到账消息推送", "message" : "10001" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app3", "annual rate" : "30", "describe" : "每天收益会消息推送", "message" : "10004" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IJIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app4", "annual_rate" : "38%", "describe" : "每天收益立即到账消息推送", "message" : "10002" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IZIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app5", "annual_rate" : "40%", "describe" : "每天收益到账消息推送", "message" : "10003" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "I5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app6", "annual_rate" : "33%", "describe" : "通过短信提示获取收益消息", "message" : "10005" } } ] } }
操作步骤
- 进入阿里云Elasticsearch控制台的Logstash页面。
- 进入目标实例。
- 在顶部菜单栏处,选择地域。
- 在Logstash实例中单击目标实例ID。
- 在左侧导航栏,单击管道管理。
- 单击创建管道。
在创建管道任务页面,输入管道ID并配置管道。
本文使用的管道配置是在同一个实例中实现的,如果源实例和目标实例不同,可以替换Logstash中的配置。
input { elasticsearch { hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "your_password" index => "yc_text" docinfo => true } } filter { mutate { merge => { "message" => "app.name" } } mutate { rename => [ "message","anger" ] } mutate { remove_field => [ "app.name" ] } } output { elasticsearch { hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "your_password" index => "yc_text_new" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" } }
管道配置的原理说明如下:
通过Logstash的filter.mutate.merge参数合并源索引yc_text中的app.name和message两个字段。合并后,Logstash会使用message字段存放原message和app.name两个字段的数据。
通过filter.mutate.rename参数将合并后的message字段重命名为anger。
前两步完成后,app.name字段还会继续存在。为了避免出现重复数据,管道配置中使用filter.mutate.remove_field将app.name字段移除。
最后将合并后的字段anger字段传输到yc_text_new索引中。
说明logstash-filter-mutate插件会按照优先级执行mutate块中的定义的操作,详细信息请参见本文的背景信息。但是您可以通过使用不同的mutate块来控制这个顺序,例如以上管道配置中使用了三个mutate块来控制执行顺序:rename、merge、remove_field。
更多管道配置说明,请参见通过配置文件管理管道和Logstash配置文件说明。
警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。- 单击保存或者保存并部署。
- 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
- 保存并部署:保存并且部署后,会触发实例重启,使配置生效。
验证结果
登录目标阿里云Elasticsearch的Kibana控制台。
具体操作请参见登录Kibana控制台。
根据页面提示进入Kibana主页,单击右上角的Dev tools。
在Console页签,执行以下脚本,查询目标索引中的信息。
GET yc_text_new/_search { "query": { "match_all": {} } }
返回结果如下。
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "H5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "每天收益到账消息推送", "@version" : "1", "anger" : [ "10001", "app2" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "35%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IZIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "每天收益到账消息推送", "@version" : "1", "anger" : [ "10003", "app5" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "40%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "I5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "通过短信提示获取收益消息", "@version" : "1", "anger" : [ "10005", "app6" ], "@timestamp" : "2021-12-15T03:45:25.322Z", "annual_rate" : "33%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "HpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "可以自助选择消息推送", "@version" : "1", "anger" : [ "10000", "app1" ], "@timestamp" : "2021-12-15T03:45:25.298Z", "annual_rate" : "31%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IJIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "每天收益立即到账消息推送", "@version" : "1", "anger" : [ "10002", "app4" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "38%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "每天收益会消息推送", "@version" : "1", "anger" : [ "10004", "app3" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual rate" : "30" } } ] } }
根据结果可以看到,新字段anger已经整合了旧字段message和app.name的数据,且app.name字段已被移除。