通过Monstache实时同步MongoDB数据至Elasticsearch

当您的业务数据存储在MongoDB中,并且需要进行语义分析和大图展示时,可借助阿里云Elasticsearch实现全文搜索、语义分析、可视化展示等。本文介绍如何通过MonstacheMongoDB数据实时同步至阿里云Elasticsearch,并对数据进行分析及展示。

背景信息

本文以解析及统计热门电影数据为例,提供的解决方案可以帮助您完成以下需求:

  • 通过Monstache快速同步及订阅全量或增量数据。

  • MongoDB数据实时同步至高版本Elasticsearch。

  • 解读Monstache常用配置参数,应用于更多的业务场景。

方案优势

  • MongoDB、阿里云ElasticsearchMonstache服务部署在专有网络VPC(Virtual Private Cloud)内,所有数据私网通信,高速且安全。

  • Monstache基于MongoDBoplog实现实时数据同步及订阅,支持MongoDB与高版本Elasticsearch之间的数据同步,同时支持MongoDB的变更流和聚合管道功能,并且拥有丰富的特性

  • Monstache不仅支持软删除和硬删除,还支持数据库删除和集合删除,能够确保Elasticsearch端实时与源端数据保持一致。

操作流程

  1. 步骤一:环境准备

    准备同一专有网络下的阿里云MongoDB实例、阿里云Elasticsearch实例和ECS实例。其中ECS实例用来安装Monstache。

    说明

    请准备版本兼容的Monstache工具、阿里云ElasticsearchMongoDB实例,版本兼容性详情请参见Monstache version

  2. 步骤二:搭建Monstache环境

    ECS实例中安装Monstache,用来将MongoDB中的数据同步至阿里云Elasticsearch。安装前需要先配置Go环境变量。

  3. 步骤三:配置实时同步任务

    修改默认的Monstache配置文件,在配置文件中指定MongoDBElasticsearch的访问地址、待同步的集合、Elasticsearch的用户名和密码等参数。配置完成后,运行Monstache服务,即可将MongoDB中的数据实时同步至阿里云Elasticsearch中。

  4. 步骤四:验证数据同步结果

    分别在MongoDB数据库中添加、更新、删除数据,验证数据是否实时同步。

  5. 步骤五:通过Kibana分析并展示数据

    Kibana控制台中,分析数据并使用Pie图展示分析结果。

步骤一:环境准备

  1. 创建阿里云Elasticsearch实例,并开启实例的自动创建索引功能。

    具体操作步骤请参见创建阿里云Elasticsearch实例配置YML参数。本文使用的实例版本为通用商业版6.7。

  2. 创建阿里云MongoDB实例,并准备测试数据。

    具体操作步骤请参见MongoDB快速入门。本文以4.2版本的副本集MongoDB实例为例,部分数据如下。测试数据

    重要

    MongoDB实例必须是副本集或分片集架构,不支持单节点架构。

  3. 创建ECS实例。

    具体操作步骤请参见自定义购买实例。该ECS实例用来安装Monstache,需要与阿里云Elasticsearch实例在同一专有网络下,且选择Linux操作系统。

步骤二:搭建Monstache环境

  1. 连接ECS实例。

    具体操作请参见通过密码或密钥认证登录Linux实例

    说明

    本文档以普通用户权限为例。

  2. 安装Go,并配置环境变量。

    说明

    由于Monstache数据同步依赖于Go语言,因此需要先在ECS中准备Go环境。

    1. 下载Go安装包并解压。

      wget https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz
      tar -xzf go1.14.4.linux-amd64.tar.gz
    2. 配置环境变量。

      使用vim ~/.bash_profile命令打开环境变量配置文件,并将如下内容写入该文件中。其中GOPROXY用来指定阿里云Go模块代理。

      export GOROOT=/home/test1/go
      export GOPATH=/home/go/
      export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
      export GOPROXY=https://mirrors.aliyun.com/goproxy/
    3. 应用环境变量配置。

      source ~/.bash_profile
  3. 安装Monstache。

    1. Git库中下载安装包。

      git clone https://github.com/rwynn/monstache.git
      说明

      如果出现git: command not found的错误提示,需要先执行sudo yum install -y git命令安装Git。

    2. 进入monstache目录。

      cd monstache
    3. 切换版本。

      本文以rel5版本为例。

      git checkout rel5
    4. 安装Monstache。

      sudo go install
    5. 查看Monstache版本。

      monstache -v

      执行成功后,预期结果如下。

      5.5.5

步骤三:配置实时同步任务

Monstache配置使用TOML格式,默认情况下,Monstache会使用默认端口连接本地主机上的ElasticsearchMongoDB,并追踪MongoDB oplog。在Monstache运行期间,MongoDB的任何更改都会同步到Elasticsearch中。

由于本文使用阿里云MongoDBElasticsearch,并且需要指定同步对象(mydb数据库中的hotmoviescol集合),因此要修改默认的Monstache配置文件。修改方式如下:

  1. 进入Monstache安装目录,创建并编辑配置文件。

    cd monstache
    vim config.toml
  2. 参考以下示例,修改配置文件。

    简单的配置示例如下,详细配置请参见Monstache Usage

    # connection settings
    
    # connect to MongoDB using the following URL
    mongo-url = "mongodb://<your_mongodb_user>:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717"
    # connect to the Elasticsearch REST API at the following node URLs
    elasticsearch-urls = ["http://es-cn-mp91kzb8m00******.elasticsearch.aliyuncs.com:9200"]
    
    # frequently required settings
    
    # if you need to seed an index from a collection and not just listen and sync changes events
    # you can copy entire collections or views from MongoDB to Elasticsearch
    direct-read-namespaces = ["mydb.hotmovies","mydb.col"]
    
    # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces
    # change streams require at least MongoDB API 3.6+
    # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment
    # in this case you usually don't need regexes in your config to filter collections unless you target the deployment.
    # to listen to an entire db use only the database name.  For a deployment use an empty string.
    #change-stream-namespaces = ["mydb.col"]
    
    # additional settings
    
    # if you don't want to listen for changes to all collections in MongoDB but only a few
    # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection
    # this setting does not initiate a copy, it is only a filter on the change event listener
    #namespace-regex = '^mydb\.col$'
    # compress requests to Elasticsearch
    #gzip = true
    # generate indexing statistics
    #stats = true
    # index statistics into Elasticsearch
    #index-stats = true
    # use the following PEM file for connections to MongoDB
    #mongo-pem-file = "/path/to/mongoCert.pem"
    # disable PEM validation
    #mongo-validate-pem-file = false
    # use the following user name for Elasticsearch basic auth
    elasticsearch-user = "elastic"
    # use the following password for Elasticsearch basic auth
    elasticsearch-password = "<your_es_password>"
    # use 4 go routines concurrently pushing documents to Elasticsearch
    elasticsearch-max-conns = 4
    # use the following PEM file to connections to Elasticsearch
    #elasticsearch-pem-file = "/path/to/elasticCert.pem"
    # validate connections to Elasticsearch
    #elastic-validate-pem-file = true
    # propagate dropped collections in MongoDB as index deletes in Elasticsearch
    dropped-collections = true
    # propagate dropped databases in MongoDB as index deletes in Elasticsearch
    dropped-databases = true
    # do not start processing at the beginning of the MongoDB oplog
    # if you set the replay to true you may see version conflict messages
    # in the log if you had synced previously. This just means that you are replaying old docs which are already
    # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones.
    #replay = false
    # resume processing from a timestamp saved in a previous run
    resume = true
    # do not validate that progress timestamps have been saved
    #resume-write-unsafe = false
    # override the name under which resume state is saved
    #resume-name = "default"
    # use a custom resume strategy (tokens) instead of the default strategy (timestamps)
    # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+
    resume-strategy = 0
    # exclude documents whose namespace matches the following pattern
    #namespace-exclude-regex = '^mydb\.ignorecollection$'
    # turn on indexing of GridFS file content
    #index-files = true
    # turn on search result highlighting of GridFS content
    #file-highlighting = true
    # index GridFS files inserted into the following collections
    #file-namespaces = ["users.fs.files"]
    # print detailed information including request traces
    verbose = true
    # enable clustering mode
    cluster-name = 'es-cn-mp91kzb8m00******'
    # do not exit after full-sync, rather continue tailing the oplog
    #exit-after-direct-reads = false
    [[mapping]]
    namespace = "mydb.hotmovies"
    index = "hotmovies"
    type = "movies"
    
    [[mapping]]
    namespace = "mydb.col"
    index = "mydbcol"
    type = "collection"

    参数

    说明

    mongo-url

    MongoDB实例的主节点访问地址,可在实例的基本信息页面获取。其中<your_mongodb_user>为您使用的MongoDB实例的数据库账号,<your_mongodb_password>为对应数据库账号的密码。

    获取前需配置MongoDB实例的白名单,即在白名单中添加安装MonstacheECS实例的内网IP地址,详情请参见设置白名单

    elasticsearch-urls

    阿里云Elasticsearch实例的访问地址,格式为http://<阿里云Elasticsearch实例的私网地址>:9200。阿里云Elasticsearch实例的私网地址可在实例的基本信息页面获取,详情请参见查看实例的基本信息

    direct-read-namespaces

    指定待同步的集合,详情请参见direct-read-namespaces。本文同步的数据集为mydb数据库下的hotmoviescol集合。

    change-stream-namespaces

    如果要使用MongoDB变更流功能,需要指定此参数。启用此参数后,oplog追踪会被设置为无效,详情请参见change-stream-namespaces

    namespace-regex

    通过正则表达式指定需要监听的集合。此设置可以用来监控符合正则表达式的集合中数据的变化。

    elasticsearch-user

    访问阿里云Elasticsearch实例的用户名,默认为elastic。

    重要

    实际业务中不建议使用elastic用户,这样会降低系统安全性。建议使用自建用户,并给予自建用户分配相应的角色和权限,详情请参见通过Elasticsearch X-Pack角色管理实现用户权限管控

    elasticsearch-password

    对应用户的密码。elastic用户的密码在创建实例时指定,如果忘记可进行重置,重置密码的注意事项和操作步骤请参见重置实例访问密码

    elasticsearch-max-conns

    定义连接Elasticsearch的线程数。默认为4,即使用4Go线程同时将数据同步到Elasticsearch。

    dropped-collections

    默认为true,表示当删除MongoDB集合时,会同时删除Elasticsearch中对应的索引。

    dropped-databases

    默认为true,表示当删除MongoDB数据库时,会同时删除Elasticsearch中对应的索引。

    resume

    默认为false。设置为true,Monstache会将已成功同步到ElasticsearchMongoDB操作的时间戳写入monstache.monstache集合中。当Monstache因为意外停止时,可通过该时间戳恢复同步任务,避免数据丢失。如果指定了cluster-name,该参数将自动开启,详情请参见resume

    resume-strategy

    指定恢复策略。仅当resumetrue时生效,详情请参见resume-strategy

    verbose

    默认为false,表示不启用调试日志。

    cluster-name

    指定集群名称。指定后,Monstache将进入高可用模式,集群名称相同的进程将进行协调,详情请参见cluster-name

    mapping

    指定Elasticsearch索引映射。默认情况下,数据从MongoDB同步到Elasticsearch时,索引会自动映射为数据库名.集合名。如果需要修改索引名称,可通过该参数设置,详情请参见Index Mapping

    说明

    Monstache支持丰富的参数配置,以上配置仅使用了部分参数完成数据实时同步,如果您有更复杂的同步需求,请参见Monstache configAdvanced进行配置。

  3. 运行Monstache。

    monstache -f config.toml
    说明

    通过-f参数,您可以显式运行Monstache,系统会打印所有调试日志(包括对Elasticsearch的请求追踪)。

步骤四:验证数据同步结果

  1. 分别进入MongoDBDMS控制台和阿里云Elasticsearch实例的Kibana控制台,查看同步前后对应文档的数量。

    说明
    • MongoDB

      db.hotmovies.find().count()

      预期结果如下。

      [
      10000
      ]
    • 阿里云Elasticsearch

      GET hotmovies/_count

      预期结果如下。通过以下结果可以看到同步前后的文档的数量都为10000条。

      {
        "count" : 10000,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        }
      }
  2. MongoDB数据库中插入数据,查看该数据是否同步到阿里云Elasticsearch实例中。

    • MongoDB

      db.hotmovies.insert({id: 11003,title: "乘风破浪的程序媛",overview: "一群IT高智商女人,如何打破传统逆序IT精英",original_language:"cn",release_date:"2020-06-17",popularity:67.654,vote_count:65487,vote_average:9.9})
      db.hotmovies.insert({id: 11004,title: "英姿飒爽的程序猿",overview: "一群IT高智商man,如何打破传统逆序IT精英",original_language:"cn",release_date:"2020-06-15",popularity:77.654,vote_count:85487,vote_average:11.9})
    • 阿里云Elasticsearch

      GET hotmovies/_search
      {
        "query": {
          "bool": {
            "should": [
              {"term":{"id":"11003"}},
              {"term":{"id":"11004"}}
            ]
          }
        }
      }

      预期结果如下。插入数据

  3. MongoDB数据库中更新数据,查看阿里云Elasticsearch实例中对应的数据是否会同步更新。

    • MongoDB

      db.hotmovies.update({'title':'乘风破浪的程序媛'},{$set:{'title':'美女小姐姐'}})
    • 阿里云Elasticsearch

      GET hotmovies/_search
      {
        "query": {
          "match": {
            "id":"11003"
          }
        }
      }

      预期结果如下。更新数据返回结果

  4. MongoDB数据库中删除数据,查看阿里云Elasticsearch实例中对应的数据是否会同步删除。

    • MongoDB

      db.hotmovies.remove({id: 11003})
      db.hotmovies.remove({id: 11004})
    • 阿里云Elasticsearch

      GET hotmovies/_search
      {
        "query": {
          "bool": {
            "should": [
              {"term":{"id":"11003"}},
              {"term":{"id":"11004"}}
            ]
          }
        }
      }

      预期结果如下。删除数据返回结果

步骤五:通过Kibana分析并展示数据

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 创建索引模式。

    创建索引模式
    1. 在左侧导航栏,单击Management
    2. Kibana区域,单击Index Patterns
    3. 单击Create index pattern
    4. 输入Index pattern名称,单击Next step
    5. Time Filter field name中,选择时间过滤器字段名(本文选择I don't want to use the Time Filter)。

    6. 单击Create index pattern
  3. 配置Kibana大图。

    本文以配置最受欢迎的Top10电影的Pie图为例,操作步骤如下:

    1. 在左侧导航栏,单击Visualize

    2. 在搜索框右侧,单击+

    3. New Visualization对话框中,单击Pie

      创建Pie图
    4. 单击hotmovies索引模式。

      单击索引模式
    5. 按照下图配置MetricsBuckets

      Pie图配置
    6. 单击运行图标图标,应用配置,查看数据展示结果。

      Pie图展示结果

常见问题

  • 问题

    阿里云Elasticsearch实例开启高可用、高并发功能后,数据有丢失现象,如何排查?

  • 解决方案

    查看阿里云Elasticsearch集群的整体情况是否正常:

    • 正常:需要排查Monstache服务的问题,详细信息请参见Monstache官网

    • 不正常:参见常见问题,排查阿里云Elasticsearch集群的问题。同时降低并发数,观察数据是否正常。