在进行Elasticsearch(ES)集群间数据迁移时,例如从自建ES迁移到阿里云ES,必须确保源端和目标端的索引结构保持一致,避免自动映射可能引入的数据丢失、格式错误及查询性能下降等问题,因此一般要求在数据迁移前手动创建目标索引,预先定义索引的映射和设置。本文介绍运用Python脚本预定义ES集群索引的映射和设置,并实现索引结构、索引模板和索引生命周期管理(ILM)在目标集群的写入。
前提条件
已创建两个阿里云ES实例。具体操作,请参见创建阿里云Elasticsearch实例。
本文以源端和目标端的ES版本均为7.10为例。
说明本文以ES 7.10版本为例提供相关的Python脚本示例。高版本ES在mapping构造上可能存在差异,其他版本需结合场景进行调整,如低版本多type结构在高版本已不支持,无法通过文档示例进行创建。
已创建ECS实例,并配置python环境,具体操作,请参考Linux系统实例快速入门。
本文以Python 3.6.8为例,其他版本结合对应版本接口的Requests模块进行调整。
已打通源端ES和目标端ES与ECS的网络,将ECS公网或私网IP地址,分别配置到源端ES和目标端ES的公网地址访问白名单或VPC私网访问白名单中。
说明生产环境注重数据安全性,建议您通过私网连通ECS与源端ES和目标端ES。
同步索引结构
同步索引结构(mappings)和设置(settings)下的主副本配置。
准备测试数据。
在源端ES中执行以下命令,创建索引。
PUT /product_info { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中执行以下脚本,同步索引结构和设置。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照实际环境调整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用户名 'old_cluster_user': 'yourusername', # 源集群密码 'old_cluster_password': 'yourpassward', # 源集群http协议,可选 http/https 'old_cluster_protocol': 'http', # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目标集群用户名 'new_cluster_user': 'yourusername', # 目标集群密码 'new_cluster_password': 'yourpassward', # 目标集群http协议,可选 http/https 'new_cluster_protocol': 'http', # 目标集群默认副本数 'default_replicas': 1, } # 通用的 HTTP 请求函数 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印错误信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果响应不是 JSON 格式,打印错误并返回原始内容 print("Invalid JSON response:") print(response.text) raise # 获取所有索引列表 def get_indices(): endpoint = "/_cat/indices?format=json" indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) index_list = [index['index'] for index in indices_result if index['status'] == 'open'] return index_list # 获取索引的设置 def get_index_settings(index): endpoint = f"/{index}/_settings" index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) settings = index_settings[index]['settings']['index'] shards_replicas_settings = { 'number_of_shards': settings.get('number_of_shards'), 'number_of_replicas': config['default_replicas'] } return {'settings': shards_replicas_settings} # 获取索引的映射 def get_index_mapping(index): endpoint = f"/{index}/_mapping" index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return {'mappings': index_mapping[index]['mappings']} # 创建新索引 def create_index(old_index_name, new_index_name=""): if not new_index_name: new_index_name = old_index_name settings = get_index_settings(old_index_name) mappings = get_index_mapping(old_index_name) body = {**settings, **mappings} endpoint = f"/{new_index_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body) print(f"Index {new_index_name} created with result: {create_result}") # 主函数 def main(): index_list = get_indices() for index in index_list: if not index.startswith("."): # 忽略系统索引 create_index(index) if __name__ == '__main__': main()
验证结果。
在目标端ES中执行以下命令,查看同步成功的索引。
GET _cat/indices/product_info
同步索引模板
准备测试数据。
在源端ES中执行以下命令,创建索引模板。
PUT _template/product { "index_patterns": ["product_*"], "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中执行以下脚本,同步索引模板。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照实际环境调整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用户名 'old_cluster_user': 'yourusername', # 源集群密码 'old_cluster_password': 'yourpassward', # 源集群http协议,可选 http/https 'old_cluster_protocol': 'http', # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目标集群用户名 'new_cluster_user': 'yourusername', # 目标集群密码 'new_cluster_password': 'yourpassward', # 目标集群http协议,可选 http/https 'new_cluster_protocol': 'http', # 目标集群默认副本数 'default_replicas': 1, } # 通用的 HTTP 请求函数 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印错误信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果响应不是 JSON 格式,打印错误并返回原始内容 print("Invalid JSON response:") print(response.text) raise # 获取源集群的所有索引模板 def get_index_templates(): endpoint = "/_template" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 创建目标集群的索引模板 def create_index_template(template_name, template_body): endpoint = f"/_template/{template_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body) print(f"Template {template_name} created with result: {create_result}") # 主函数 def main(): # 同步索引模板 templates = get_index_templates() for template_name, template_body in templates.items(): create_index_template(template_name, template_body) if __name__ == '__main__': main()
验证结果。
在目标端ES中执行以下命令,查询目标模板的信息。
GET _template/product
同步索引生命周期管理(ILM)
准备测试数据。
在源端ES中执行以下命令,创建索引生命周期管理(ILM)。
PUT _ilm/policy/product { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "1GB", "max_age": "1d", "max_docs": 1000 } } }, "delete": { "min_age": "2h", "actions": { "delete": {} } } } } }
在ECS中执行以下脚本,同步索引生命周期管理(ILM)。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照实际环境调整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用户名 'old_cluster_user': 'yourusername', # 源集群密码 'old_cluster_password': 'yourpassward', # 源集群http协议,可选 http/https 'old_cluster_protocol': 'http', # 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目标集群用户名 'new_cluster_user': 'yourusername', # 目标集群密码 'new_cluster_password': 'yourpassward', # 目标集群http协议,可选 http/https 'new_cluster_protocol': 'http', # 目标集群默认副本数 'default_replicas': 1, } # 通用的 HTTP 请求函数 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印错误信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果响应不是 JSON 格式,打印错误并返回原始内容 print("Invalid JSON response:") print(response.text) raise # 获取源集群的所有索引ILM def get_ilm_polices(): endpoint = "/_ilm/policy" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 创建目标集群的索引ILM def create_ilm_policy(policy_name, policy_body): policy_body.pop('version', None) policy_body.pop('modified_date', None) policy_body.pop('modified_date_string', None) endpoint = f"/_ilm/policy/{policy_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body) print(f"Policy {policy_name} created with result: {create_result}") # 主函数 def main(): # 同步索引ILM policies = get_ilm_polices() for policy_name, policy_body in policies.items(): create_ilm_policy(policy_name, policy_body) if __name__ == '__main__': main()
验证结果。
在目标端ES中执行以下命令,查询索引生命周期管理(ILM)的信息。
GET _ilm/policy/product