本文介绍了从 AWS OpenSearch 迁移至阿里云 Elasticsearch 的技术方案,涵盖数据迁移方法及关键配置步骤。
1. 产品背景
OpenSearch 是一个分布式、RESTful 的开源搜索与分析引擎,由 AWS 基于 Elasticsearch 7.10.2 和 Kibana 7.10.2(Apache 2.0 许可证下的最后一个版本)独立开发。
2021 年,Elastic 公司将 Elasticsearch 与 Kibana 的开源许可证从 Apache 2.0 变更为 SSPL(Server Side Public License)及 Elastic 自有许可证。为保障用户持续使用完全开源、无商业限制的搜索服务,AWS 创建了 OpenSearch 项目。
2. 产品选型建议
推荐目标版本:阿里云 Elasticsearch 7.10.0 或更高兼容版本(如 7.10.x)。
选型依据:OpenSearch 起源于 Elasticsearch 7.10.2,选择 7.10.0+ 可最大程度保障功能兼容性、降低迁移风险。
3. 迁移方案概览
根据业务需求和数据规模,可采用以下迁移策略:
(可选)索引迁移
在进行数据迁移时,Logstash会自动创建索引,但是自动创建的索引可能与待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。可参考以下文档:
全量数据迁移
适用于首次将历史数据从 AWS OpenSearch 同步至阿里云 Elasticsearch。
通过 Logstash OSS 数据管道全量同步数据,利用
_id幂等写入避免重复。
增量数据迁移
用于在全量迁移完成后,持续同步新增或更新的数据,保障业务连续性。
依赖索引中包含时间字段(如
create_time);通过 Logstash OSS 定时任务实现分钟级增量拉取,利用
_id幂等写入避免重复。
反向数据迁移
适用于容灾演练、双活架构或回滚测试等场景。
仅需配置增量同步任务;
使用 Logstash OSS 从阿里云 Elasticsearch 拉取数据并写入 AWS OpenSearch。
4. 使用 Logstash OSS 进行数据迁移
4.1 环境准备
下载并解压 Logstash OSS(含 OpenSearch 插件)
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
cd logstash-7.16.3配置 Ruby 源(提升国内插件安装速度)
cd logstash-7.16.3
sed -i 's#source "https://rubygems.org"#source "https://gems.ruby-china.com"#' Gemfile安装 OpenSearch Input 插件
./bin/logstash-plugin install --preserve logstash-input-opensearch性能调优参考:通过Logstash将自建Elasticsearch数据全量或增量迁移至阿里云
4.2 HTTPS 证书配置(源端启用 HTTPS 时)
若 AWS OpenSearch 启用了 HTTPS 认证,需信任 Amazon 根证书:
# 下载 Amazon Root CA 1
curl -o AmazonRootCA1.pem https://www.amazontrust.com/repository/AmazonRootCA1.pem
# 安装到系统证书库(适用于 CentOS/RHEL)
sudo cp AmazonRootCA1.pem /etc/pki/ca-trust/source/anchors/
sudo update-ca-trust extract5. 全量数据迁移
5.1 配置文件:full_to_es.conf
input {
opensearch {
hosts => ["https://search-ti4gov533osxxxx.us-east-1.es.amazonaws.com:443"]
user => "xxxxxx"
password => "xxxxxx"
index => "kibana_sample_data_*"
docinfo => true
slices => 5
size => 5000
query => '{ "query": { "match_all": {} } }'
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
elasticsearch {
hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
user => "elastic"
password => "xxxxxx"
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
}
5.2 启动迁移任务
nohup bin/logstash -f move/full_to_es.conf >/dev/null 2>&1 &5.3 验证迁移结果
在 Kibana Dev Tools 中执行:
GET _cat/indices?v6. 增量数据迁移
前提条件:索引必须包含可排序的时间字段(如 create_time)。若未包含,请参见第 8 节配置 Ingest Pipeline 自动注入。6.1 配置文件:inc_to_es.conf
input {
opensearch {
hosts => ["https://search-ti4gov533osxxxx.us-east-1.es.amazonaws.com:443"]
user => "xxxxxx"
password => "xxxxxx"
index => "kibana_sample_data_logs"
query => '{"query":{"range":{"create_time":{"gte":"now-5m","lte":"now/m"}}}}'
schedule => "* * * * *" # 每分钟执行一次
scroll => "5m"
docinfo => true
size => 5000
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
elasticsearch {
hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
user => "elastic"
password => "xxxxxx"
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}" # 保证幂等写入
ilm_enabled => false
manage_template => false
}
}
6.2 启动增量任务
nohup bin/logstash -f move/inc_to_es.conf >/dev/null 2>&1 &7. 反向数据迁移
反向数据迁移主要用于应急回切场景,仅配置增量任务。
7.1 配置文件:inc_to_opensearch.conf
input {
elasticsearch {
hosts => ["http://es-cn-7js4ji2fxxxxxx.elasticsearch.aliyuncs.com:9200"]
user => "elastic"
password => "xxxx"
index => "kibana_sample_data_logs"
query => '{"query":{"range":{"create_time":{"gte":"now-5m","lte":"now/m"}}}}'
schedule => "* * * * *"
scroll => "5m"
docinfo => true
size => 5000
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
opensearch {
hosts => ["https://search-ti4gov533osigxxxxx.us-east-1.es.amazonaws.com:443"]
user => "xxxxx"
password => "xxxxx"
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
manage_template => false
}
}
7.2 启动任务
nohup bin/logstash -f move/inc_to_opensearch.conf >/dev/null 2>&1 &8. 如何引入创建时间字段
为支持增量迁移,建议在数据写入阶段自动注入 create_time 字段。
8.1 创建 Ingest Pipeline
在 OpenSearch 或 Elasticsearch 中执行:
PUT _ingest/pipeline/set_ingest_time_pipeline
{
"description": "自动设置 create_time 为数据摄入时间",
"processors": [
{
"set": {
"field": "create_time",
"value": "{{_ingest.timestamp}}",
"if": "ctx.create_time == null"
}
}
]
}
8.2 写入时指定 Pipeline(Java SDK 示例)
private static final String PIPELINE_ID = "set_ingest_time_pipeline";
IndexOperation<OpenSearchModel> indexOp = new IndexOperation.Builder<OpenSearchModel>()
.index(INDEX_NAME)
.id(openSearchModel.getUniqueId(...))
.document(openSearchModel)
.pipeline(PIPELINE_ID) // 关键:指定 pipeline
.build();
8.3 读取时忽略 create_time 字段
为避免反序列化冲突,在实体类中屏蔽该字段:
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties({"create_time"})
public class OpenSearchModel {
// 业务字段定义
}
影响说明:需在应用代码中适配写入与读取逻辑,确保兼容性。
9. 数据校验
迁移完成后,建议使用 阿里云 Cloud Migration Hub (CMH) 提供的数据一致性校验能力,验证源端与目标端数据完整性。