通过Logstash实现多字段数据整合

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

在使用Logstash传输数据时,如果您需要通过合并字段来整合数据,例如将a字段和b字段合并为一个新的c字段,然后移除a字段和b字段,可以通过logstash-filter-mutate插件的多个模块实现。此插件为系统默认安装插件,无须再安装,且不支持卸载。本文介绍如何通过logstash-filter-mutate插件实现多字段合并。

背景信息

logstash-filter-mutate插件支持对事件中的字段进行重命名、删除、替换和修改操作。配置文件中的mutate按照下表中的顺序执行,详细信息请参见Mutate filter plugin

模块

输入类型

coerce

hash

rename

hash

update

hash

replace

hash

convert

hash

gsub

array

uppercase

array

capitalize

array

lowercase

array

strip

array

remove_field

array

split

hash

join

hash

merge

hash

copy

hash

前提条件

  • 创建阿里云Elasticsearch实例。

    具体操作,请参见创建阿里云Elasticsearch实例,本文以7.10版本实例为例。

  • 开启目标Elasticsearch实例的自动创建索引功能。

    具体操作请参见配置YML参数

    说明

    自动创建的索引可能不符合您的预期,不建议开启,本文仅供测试。在实际业务中,建议您先在目标Elasticsearch实例中创建索引,再通过Logstash传输数据。创建索引的具体操作请参见快速入门

  • 创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。

    具体操作,请参见创建阿里云Logstash实例

  • 在源Elasticsearch实例中准备测试数据。

    本文使用的测试数据如下。其中源索引的名称为yc_text,待合并的字段为app.namemessage

    {
      "took" : 2,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app1",
              "annual_rate" : "31%",
              "describe" : "可以自助选择消息推送",
              "message" : "10000"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app2",
              "annual_rate" : "35%",
              "describe" : "每天收益到账消息推送",
              "message" : "10001"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app3",
              "annual rate" : "30",
              "describe" : "每天收益会消息推送",
              "message" : "10004"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app4",
              "annual_rate" : "38%",
              "describe" : "每天收益立即到账消息推送",
              "message" : "10002"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app5",
              "annual_rate" : "40%",
              "describe" : "每天收益到账消息推送",
              "message" : "10003"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app6",
              "annual_rate" : "33%",
              "describe" : "通过短信提示获取收益消息",
              "message" : "10005"
            }
          }
        ]
      }
    }

操作步骤

  1. 进入阿里云Elasticsearch控制台的Logstash页面
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理
  4. 单击创建管道
  5. 创建管道任务页面,输入管道ID并配置管道。

    本文使用的管道配置是在同一个实例中实现的,如果源实例和目标实例不同,可以替换Logstash中的配置。

    input {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text"
            docinfo => true
        }
    }
    filter {
        mutate {
            merge =>  { "message" => "app.name" }
        }
    
        mutate {
            rename => [ "message","anger" ]
        }
    
        mutate {
            remove_field => [ "app.name" ]
        }
    }
    output {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text_new"
            document_type => "%{[@metadata][_type]}"
            document_id => "%{[@metadata][_id]}"
        }
    }
                            

    管道配置的原理说明如下:

    1. 通过Logstashfilter.mutate.merge参数合并源索引yc_text中的app.namemessage两个字段。合并后,Logstash会使用message字段存放原messageapp.name两个字段的数据。

    2. 通过filter.mutate.rename参数将合并后的message字段重命名为anger

    3. 前两步完成后,app.name字段还会继续存在。为了避免出现重复数据,管道配置中使用filter.mutate.remove_fieldapp.name字段移除。

    4. 最后将合并后的字段anger字段传输到yc_text_new索引中。

    说明

    logstash-filter-mutate插件会按照优先级执行mutate块中的定义的操作,详细信息请参见本文的背景信息。但是您可以通过使用不同的mutate块来控制这个顺序,例如以上管道配置中使用了三个mutate块来控制执行顺序:renamemergeremove_field

    更多管道配置说明,请参见通过配置文件管理管道Logstash配置文件说明

    警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。
  6. 单击保存或者保存并部署
    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

验证结果

  1. 登录目标阿里云ElasticsearchKibana控制台。

    具体操作请参见登录Kibana控制台

  2. 根据页面提示进入Kibana主页,单击右上角的Dev tools

  3. Console页签,执行以下脚本,查询目标索引中的信息。

    GET yc_text_new/_search
    {
      "query": {
        "match_all": {}
      }
    }

    返回结果如下。

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益到账消息推送",
              "@version" : "1",
              "anger" : [
                "10001",
                "app2"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "35%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益到账消息推送",
              "@version" : "1",
              "anger" : [
                "10003",
                "app5"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "40%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "通过短信提示获取收益消息",
              "@version" : "1",
              "anger" : [
                "10005",
                "app6"
              ],
              "@timestamp" : "2021-12-15T03:45:25.322Z",
              "annual_rate" : "33%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "可以自助选择消息推送",
              "@version" : "1",
              "anger" : [
                "10000",
                "app1"
              ],
              "@timestamp" : "2021-12-15T03:45:25.298Z",
              "annual_rate" : "31%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益立即到账消息推送",
              "@version" : "1",
              "anger" : [
                "10002",
                "app4"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "38%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益会消息推送",
              "@version" : "1",
              "anger" : [
                "10004",
                "app3"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual rate" : "30"
            }
          }
        ]
      }
    }

    根据结果可以看到,新字段anger已经整合了旧字段messageapp.name的数据,且app.name字段已被移除。