AWS OpenSearch 迁移至阿里云 Elasticsearch 指南

更新时间:
复制为 MD 格式

本文介绍了从 AWS OpenSearch 迁移至阿里云 Elasticsearch 的技术方案,涵盖数据迁移方法及关键配置步骤。

1. 产品背景

OpenSearch 是一个分布式、RESTful 的开源搜索与分析引擎,由 AWS 基于 Elasticsearch 7.10.2 和 Kibana 7.10.2(Apache 2.0 许可证下的最后一个版本)独立开发。

2021 年,Elastic 公司将 Elasticsearch 与 Kibana 的开源许可证从 Apache 2.0 变更为 SSPL(Server Side Public License)及 Elastic 自有许可证。为保障用户持续使用完全开源、无商业限制的搜索服务,AWS 创建了 OpenSearch 项目。

2. 产品选型建议

  • 推荐目标版本:阿里云 Elasticsearch 7.10.0 或更高兼容版本(如 7.10.x)。

  • 选型依据:OpenSearch 起源于 Elasticsearch 7.10.2,选择 7.10.0+ 可最大程度保障功能兼容性、降低迁移风险。

3. 迁移方案概览

根据业务需求和数据规模,可采用以下迁移策略:

(可选)索引迁移

在进行数据迁移时,Logstash会自动创建索引,但是自动创建的索引可能与待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。可参考以下文档:

通过Logstash将自建Elasticsearch数据全量或增量迁移至阿里云

全量数据迁移

适用于首次将历史数据从 AWS OpenSearch 同步至阿里云 Elasticsearch。

  • 通过 Logstash OSS 数据管道全量同步数据,利用 _id 幂等写入避免重复。

增量数据迁移

用于在全量迁移完成后,持续同步新增或更新的数据,保障业务连续性。

  • 依赖索引中包含时间字段(如 create_time);

  • 通过 Logstash OSS 定时任务实现分钟级增量拉取,利用 _id 幂等写入避免重复。

反向数据迁移

适用于容灾演练、双活架构或回滚测试等场景。

  • 仅需配置增量同步任务;

  • 使用 Logstash OSS 从阿里云 Elasticsearch 拉取数据并写入 AWS OpenSearch。

4. 使用 Logstash OSS 进行数据迁移

4.1 环境准备

下载并解压 Logstash OSS(含 OpenSearch 插件)

wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
cd logstash-7.16.3

配置 Ruby 源(提升国内插件安装速度)

cd logstash-7.16.3
sed -i 's#source "https://rubygems.org"#source "https://gems.ruby-china.com"#' Gemfile

安装 OpenSearch Input 插件

./bin/logstash-plugin install --preserve logstash-input-opensearch
性能调优参考通过Logstash将自建Elasticsearch数据全量或增量迁移至阿里云

4.2 HTTPS 证书配置(源端启用 HTTPS 时)

若 AWS OpenSearch 启用了 HTTPS 认证,需信任 Amazon 根证书:

# 下载 Amazon Root CA 1
curl -o AmazonRootCA1.pem https://www.amazontrust.com/repository/AmazonRootCA1.pem

# 安装到系统证书库(适用于 CentOS/RHEL)
sudo cp AmazonRootCA1.pem /etc/pki/ca-trust/source/anchors/
sudo update-ca-trust extract

5. 全量数据迁移

5.1 配置文件:full_to_es.conf

input {
  opensearch {
    hosts => ["https://search-ti4gov533osxxxx.us-east-1.es.amazonaws.com:443"]
    user => "xxxxxx"
    password => "xxxxxx"
    index => "kibana_sample_data_*"
    docinfo => true
    slices => 5
    size => 5000
    query => '{ "query": { "match_all": {} } }'
  }
}

filter {
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
    user => "elastic"
    password => "xxxxxx"
    index => "%{[@metadata][_index]}"
    document_id => "%{[@metadata][_id]}"
    ilm_enabled => false
    manage_template => false
  }
}

5.2 启动迁移任务

nohup bin/logstash -f move/full_to_es.conf >/dev/null 2>&1 &

5.3 验证迁移结果

在 Kibana Dev Tools 中执行:

GET _cat/indices?v

6. 增量数据迁移

前提条件:索引必须包含可排序的时间字段(如 create_time)。若未包含,请参见第 8 节配置 Ingest Pipeline 自动注入。

6.1 配置文件:inc_to_es.conf

input {
  opensearch {
    hosts => ["https://search-ti4gov533osxxxx.us-east-1.es.amazonaws.com:443"]
    user => "xxxxxx"
    password => "xxxxxx"
    index => "kibana_sample_data_logs"
    query => '{"query":{"range":{"create_time":{"gte":"now-5m","lte":"now/m"}}}}'
    schedule => "* * * * *"   # 每分钟执行一次
    scroll => "5m"
    docinfo => true
    size => 5000
  }
}

filter {
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
    user => "elastic"
    password => "xxxxxx"
    index => "%{[@metadata][_index]}"
    document_id => "%{[@metadata][_id]}"       # 保证幂等写入
    ilm_enabled => false
    manage_template => false
  }
}

6.2 启动增量任务

nohup bin/logstash -f move/inc_to_es.conf >/dev/null 2>&1 &

7. 反向数据迁移

反向数据迁移主要用于应急回切场景,仅配置增量任务。

7.1 配置文件:inc_to_opensearch.conf

input {
  elasticsearch {
    hosts => ["http://es-cn-7js4ji2fxxxxxx.elasticsearch.aliyuncs.com:9200"]
    user => "elastic"
    password => "xxxx"
    index => "kibana_sample_data_logs"
    query => '{"query":{"range":{"create_time":{"gte":"now-5m","lte":"now/m"}}}}'
    schedule => "* * * * *"
    scroll => "5m"
    docinfo => true
    size => 5000
  }
}

filter {
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output {
  opensearch {
    hosts => ["https://search-ti4gov533osigxxxxx.us-east-1.es.amazonaws.com:443"]
    user => "xxxxx"
    password => "xxxxx"
    index => "%{[@metadata][_index]}"
    document_id => "%{[@metadata][_id]}"
    manage_template => false
  }
}

7.2 启动任务

nohup bin/logstash -f move/inc_to_opensearch.conf >/dev/null 2>&1 &

8. 如何引入创建时间字段

为支持增量迁移,建议在数据写入阶段自动注入 create_time 字段。

8.1 创建 Ingest Pipeline

在 OpenSearch 或 Elasticsearch 中执行:

PUT _ingest/pipeline/set_ingest_time_pipeline
{
  "description": "自动设置 create_time 为数据摄入时间",
  "processors": [
    {
      "set": {
        "field": "create_time",
        "value": "{{_ingest.timestamp}}",
        "if": "ctx.create_time == null"
      }
    }
  ]
}

8.2 写入时指定 Pipeline(Java SDK 示例)

private static final String PIPELINE_ID = "set_ingest_time_pipeline";

IndexOperation<OpenSearchModel> indexOp = new IndexOperation.Builder<OpenSearchModel>()
    .index(INDEX_NAME)
    .id(openSearchModel.getUniqueId(...))
    .document(openSearchModel)
    .pipeline(PIPELINE_ID)  // 关键:指定 pipeline
    .build();

8.3 读取时忽略 create_time 字段

为避免反序列化冲突,在实体类中屏蔽该字段:

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties({"create_time"})
public class OpenSearchModel {
    // 业务字段定义
}
影响说明:需在应用代码中适配写入与读取逻辑,确保兼容性。

9. 数据校验

迁移完成后,建议使用 阿里云 Cloud Migration Hub (CMH) 提供的数据一致性校验能力,验证源端与目标端数据完整性。

数据校验