通过脚本实现索引结构、模板或ILM的同步

在进行Elasticsearch(ES)集群间数据迁移时,例如从自建ES迁移到阿里云ES,必须确保源端和目标端的索引结构保持一致,避免自动映射可能引入的数据丢失、格式错误及查询性能下降等问题,因此一般要求在数据迁移前手动创建目标索引,预先定义索引的映射和设置。本文介绍运用Python脚本预定义ES集群索引的映射和设置,并实现索引结构、索引模板和索引生命周期管理(ILM)在目标集群的写入。

前提条件

  1. 已创建两个阿里云ES实例。具体操作,请参见创建阿里云Elasticsearch实例

    本文以源端和目标端的ES版本均为7.10为例。

    说明

    本文以ES 7.10版本为例提供相关的Python脚本示例。高版本ES在mapping构造上可能存在差异,其他版本需结合场景进行调整,如低版本多type结构在高版本已不支持,无法通过文档示例进行创建。

  2. 已创建ECS实例,并配置python环境,具体操作,请参考Linux系统实例快速入门

    本文以Python 3.6.8为例,其他版本结合对应版本接口的Requests模块进行调整。

  3. 已打通源端ES和目标端ES与ECS的网络,将ECS公网或私网IP地址,分别配置到源端ES和目标端ES的公网地址访问白名单或VPC私网访问白名单中。

    说明

    生产环境注重数据安全性,建议您通过私网连通ECS与源端ES和目标端ES。

同步索引结构

同步索引结构(mappings)和设置(settings)下的主副本配置。

  1. 准备测试数据。

    在源端ES中执行以下命令,创建索引。

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中执行以下脚本,同步索引结构和设置。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照实际环境调整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用户名
        'old_cluster_user': 'yourusername',
        # 源集群密码
        'old_cluster_password': 'yourpassward',
        # 源集群http协议,可选 http/https
        'old_cluster_protocol': 'http',
        # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目标集群用户名
        'new_cluster_user': 'yourusername',
        # 目标集群密码
        'new_cluster_password': 'yourpassward',
        # 目标集群http协议,可选 http/https
        'new_cluster_protocol': 'http',
        # 目标集群默认副本数
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 请求函数
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印错误信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果响应不是 JSON 格式,打印错误并返回原始内容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 获取所有索引列表
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # 获取索引的设置
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # 获取索引的映射
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    # 创建新索引
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Index {new_index_name} created with result: {create_result}")
    
    # 主函数
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # 忽略系统索引
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. 验证结果。

    在目标端ES中执行以下命令,查看同步成功的索引。

    GET _cat/indices/product_info

同步索引模板

  1. 准备测试数据。

    在源端ES中执行以下命令,创建索引模板。

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中执行以下脚本,同步索引模板。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照实际环境调整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用户名
        'old_cluster_user': 'yourusername',
        # 源集群密码
        'old_cluster_password': 'yourpassward',
        # 源集群http协议,可选 http/https
        'old_cluster_protocol': 'http',
        # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目标集群用户名
        'new_cluster_user': 'yourusername',
        # 目标集群密码
        'new_cluster_password': 'yourpassward',
        # 目标集群http协议,可选 http/https
        'new_cluster_protocol': 'http',
        # 目标集群默认副本数
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 请求函数
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印错误信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果响应不是 JSON 格式,打印错误并返回原始内容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 获取源集群的所有索引模板
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 创建目标集群的索引模板
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Template {template_name} created with result: {create_result}")
    
    # 主函数
    def main():
    
        # 同步索引模板
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
  3. 验证结果。

    在目标端ES中执行以下命令,查询目标模板的信息。

    GET _template/product

同步索引生命周期管理(ILM)

  1. 准备测试数据。

    在源端ES中执行以下命令,创建索引生命周期管理(ILM)。

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. 在ECS中执行以下脚本,同步索引生命周期管理(ILM)。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照实际环境调整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用户名
        'old_cluster_user': 'yourusername',
        # 源集群密码
        'old_cluster_password': 'yourpassward',
        # 源集群http协议,可选 http/https
        'old_cluster_protocol': 'http',
        # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目标集群用户名
        'new_cluster_user': 'yourusername',
        # 目标集群密码
        'new_cluster_password': 'yourpassward',
        # 目标集群http协议,可选 http/https
        'new_cluster_protocol': 'http',
        # 目标集群默认副本数
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 请求函数
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印错误信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果响应不是 JSON 格式,打印错误并返回原始内容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 获取源集群的所有索引ILM
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 创建目标集群的索引ILM
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None)
        policy_body.pop('modified_date', None)
        policy_body.pop('modified_date_string', None)
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Policy {policy_name} created with result: {create_result}")
    
    # 主函数
    def main():
    
        # 同步索引ILM
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. 验证结果。

    在目标端ES中执行以下命令,查询索引生命周期管理(ILM)的信息。

    GET _ilm/policy/product