通过Logstash修改字段名

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

在某些业务的使用场景下,您可能需要对索引的一些字段进行重命名。例如,使用DataWorks在阿里云Elasticsearch(简称ES)集群间迁移数据时,由于源集群数据中包含了特殊符号(例如@),而DataWorks不支持特殊符号,因此需要修改字段名(去掉特殊符号)后再进行数据迁移。本文介绍如何通过Logstash修改字段名。

背景信息

您可以通过两种方式修改字段名:

  • 使用Logstash的filter,对字段进行重命名。本文采用此方式。

    本文以去除源索引字段的@符号为例进行演示,将源索引的@ctxt_user_info字段,使用Logstash的filter在目标索引中重命名为ctxt_user_info字段。

  • 使用Reindex迁移时,对字段进行重命名。

前提条件

  • 已创建阿里云Elasticsearch实例。源索引和目标索引可以在同一个ES实例中,也可以在不同的ES实例中,本文以在同一个7.10版本ES实例中为例。具体操作,请参见创建阿里云Elasticsearch实例

  • 已创建阿里云Logstash实例。Logstash实例需要与ES实例在同一专有网络下。具体操作,请参见创建阿里云Logstash实例

准备测试数据

  1. 登录阿里云ES实例的Kibana控制台。

    具体操作,请参见登录Kibana控制台

  2. 在Kibana控制台,选择菜单.png > Management > Dev Tools

  3. Console页签,执行以下代码,创建源索引product_info

    PUT /product_info
    {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "ctxt_user_info": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }
  4. 执行以下代码,在源索引中插入示例数据。

    POST /product_info/_doc/_bulk
    {"index":{}}
    {"@ctxt_user_info":"test1"}
    {"index":{}}
    {"@ctxt_user_info":"test1"}
  5. 执行以下代码,查询源索引中的示例数据。

    GET /product_info/_search

    返回结果如下,可以看到源索引中字段@ctxt_user_info中包含特殊符号@

    {
      "took" : 16,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 2,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "product_info",
            "_type" : "_doc",
            "_id" : "3BDMUZMBh7mRlA4aS0Nq",
            "_score" : 1.0,
            "_routing" : "74873",
            "_source" : {
              "@ctxt_user_info" : "test1"
            }
          },
          {
            "_index" : "product_info",
            "_type" : "_doc",
            "_id" : "3RDMUZMBh7mRlA4aS0Nq",
            "_score" : 1.0,
            "_routing" : "74873",
            "_source" : {
              "@ctxt_user_info" : "test1"
            }
          }
        ]
      }
    }

操作步骤

借助Logstash将ES索引中的字段重命名包括以下几个步骤:

  1. 在ES实例中创建目标索引,以便接收源索引中的数据。

  2. 配置Logstash管道:将源索引中的@ctxt_user_info字段,使用Logstash的filter重命名为ctxt_user_info字段,并在目标索引中输出。

  3. 在目标索引中验证ctxt_user_info字段中的特殊符号已去除。

步骤一:(可选)创建目标索引

如果您开启了ES实例的自动创建索引功能,可忽略此步骤。但自动创建的索引可能不符合您的预期,不建议您开启自动创建索引功能。

在阿里云ES实例的Kibana控制台的Dev tools中执行以下代码,创建目标索引product_info2

PUT /product_info2
{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "ctxt_user_info": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            }
        }
    }
}

步骤二:创建并配置Logstash管道

  1. 进入阿里云Elasticsearch控制台的Logstash页面
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 单击创建管道

  5. 创建管道任务页面,输入管道ID并配置管道。

    本文使用的管道配置如下。

    input {
        elasticsearch {
            hosts => ["http://es-cn-tl32gid**********.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "product_info"
            docinfo => true
        }
    }
    filter {
        mutate {
            rename => { "@ctxt_user_info" => "ctxt_user_info" }
        }
    }
    output {
        elasticsearch {
            hosts => ["http://es-cn-tl32gid**********.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "product_info2"
            document_type => "%{[@metadata][_type]}"
            document_id => "%{[@metadata][_id]}"
        }
    }
                            

    以上管道配置中,通过Logstash的filter.mutate.rename参数实现索引字段的重命名。

    更多管道配置说明,请参见通过配置文件管理管道Logstash配置文件说明

    警告

    配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。

  6. 单击下一步

  7. 配置管道参数。

  8. 单击保存或者保存并部署

    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。

    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

步骤三:验证结果

在阿里云ES实例的Kibana控制台的Dev tools中执行以下代码,查询目标索引product_info2中的数据。

GET product_info2/_search

返回结果如下。根据结果可以看到,源索引字段@ctxt_user_info中的@已经去除,索引字段被重命名为ctxt_user_info

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_info2",
        "_type" : "_doc",
        "_id" : "r5N7fn0BKQKHRO31rK6C",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-12-03T04:14:26.872Z",
          "@version" : "1",
          "ctxt_user_info" : "test1"
        }
      },
      {
        "_index" : "product_info2",
        "_type" : "_doc",
        "_id" : "rpN7fn0BKQKHRO31rK6C",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-12-03T04:14:26.871Z",
          "@version" : "1",
          "ctxt_user_info" : "test2"
        }
      }
    ]
  }
}