使用MongoDB变更流(Change Stream)实时捕获数据变更

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

当您需要实时响应数据库中的数据变更时,可以通过订阅变更MongoDB变更流(Change Streams)解决。本文将为您介绍MongoDB中变更流(Change Stream)相关的内容、使用方法及最佳实践。

什么是变更流(Change Stream)

变更流将数据库的变更事件转化为实时流。客户端可订阅此流,在数据插入、更新或删除时立即收到通知。典型应用场景包括:

  • 跨集群数据同步: 实现MongoDB集群间的增量数据复制。

  • 操作审计: 跟踪高风险操作(如删除数据库或集合)。

  • 事件驱动架构: 将变更推送到下游系统进行实时分析、缓存更新或通知。

版本演进历史

版本

更新说明

MongoDB 3.6

  • 首次发布。

  • 仅支持订阅集合(Collection)维度。

  • events类型有限。

  • 支持故障恢复。

  • 支持查看变更后的视图(Post-image)。

MongoDB 4.0

  • 支持订阅库(Database)以及集群(Cluster)维度。

  • 支持dropdropDatabaserename事件。

  • resumeToken格式从BinData变更为Hex

MongoDB 4.2

  • 支持$set$unset等更多pipeline操作符。

  • 新增startAfter选项,用于按时间点启动监听功能。

  • 修改事件的_id字段,变更流将抛出异常。

  • 移除对{readConcern: majority}的依赖。

MongoDB 5.1

  • 提升部分聚合框架下stage的执行效率。

  • 提升资源利用效率。

MongoDB 5.3

  • 支持在Chunk迁移期间过滤对孤立文档的更新。

MongoDB 6.0

  • 支持查看变更前的视图(Pre-image)。

  • 支持createcreateIndexesmodifyshardCollectionDDL语句,需要指定showExpandedEvents:true来支持;更多信息,请参见Change Events

  • Change Events新增wallTime字段,时间戳支持多种转换和展示算子(包括$toDate$tsSeconds$tsIncrement)以方便业务消费。

MongoDB 7.0

  • 支持超大变更事件(>16MB),通过全新的$changeStreamSplitLargeEvent算子来将超大变更事件进行切分。

  • change events支持refineCollectionShardKey以及reshardCollection事件。

MongoDB 8.0

  • $queryStats命令增强了跟change stream相关的指标。

  • movePrimary命令不再会使得建立了change stream的表产生非法事件了,即change stream可以正常连续处理movePrimary命令引起的数据迁移了。

使用限制

实例类型为副本集实例或分片集群实例。

配置变更流

监听更多的DDL事件

前提条件

MongoDB 6.0及以上版本(升级指南)。

操作步骤

  1. 使用 mongosh 连接数据库

  2. 执行命令watch,设置 showExpandedEvents: true

    // mongo shell or mongosh v1.x
    cursor = db.getSiblingDB("test").watch([],
      {
        showExpandedEvents: true       // 启用 更多DDL 事件监听
      }
    );
    cursor.next();

结果验证

  1. 在新的SQL窗口执行变更SQL,如db.createCollection("myCollection1")

  2. 观察原SQL窗口输出上述执行SQL相关信息。

    image

开启前镜像(Pre-image)

MongoDBpre-image是指文档在修改或删除前的完整状态快照,用于记录数据变更前的原始值。

前提条件

MongoDB 6.0及以上版本(升级指南)。

操作步骤

  1. 开启数据库维度前镜像 :

    db.adminCommand({
      setClusterParameter: {
        changeStreamOptions: {
          preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留时间
        }
      }
    })
  2. 开启集合维度前镜像 :

    说明

    若需为某数据库内所有集合启用前镜像,则必须先开启数据库维度前镜像,然后在该库的每个集合上单独启用集合级前镜像功能。

    修改现有集合

    db.runCommand({
      collMod: "myCollection",
      changeStreamPreAndPostImages: { enabled: true }
    })

    创建新集合时指定

    db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
  3. 建立变更流监听 (指定Pre-image选项):

    // 在目标集合上建立监听
    cursor = db.getCollection("myCollection").watch([],
      {
        fullDocument: 'required', // 或 'whenAvailable'
        fullDocumentBeforeChange: 'required' // 或 'whenAvailable'
      }
    )
    cursor.next();
    • required: 服务端必须返回Pre/Post-image,否则报错。

    • whenAvailable: 服务端尽力返回,不保证一定有。

结果验证

  1. 检查数据库维度的开关是否已开启:

    db.adminCommand( { getClusterParameter: "changeStreamOptions" } )

    开启成功后输出如下信息:image

  2. 检查集合配置:

    db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})

    预期输出:在返回的文档中找到类似 "options" : { "changeStreamPreAndPostImages" : { "enabled" : true } } 的字段。 

    image

  3. 在另一个mongosh窗口更新 myCollection 中的文档。

  4. 观察 cursor 返回的事件,应包含 fullDocumentBeforeChange 字段(变更前文档)。

    image

更多信息参考Change Streams with Document Pre- and Post-Images

开启后镜像(Post-image)

MongoDBPost-image是变更发生后文档的最新完整状态快照,用于记录变更后的完整文档内容。

前提条件

MongoDB 3.6及以上版本(升级指南)。

操作步骤

执行watch命令时设置fullDocument: 'updateLookup'

cursor = db.getSiblingDB("test").myCollection.watch([], 
  {
    fullDocument: 'updateLookup'
  }
);
cursor.next();

结果验证

  1. 在另一个mongosh窗口新增或更新 myCollection 中的文档。

  2. 观察 cursor 返回的事件,应包含 fullDocument 字段(变更后文档)。image

说明

返回的完整文档可能为空或者不是point-in-time的。比如:

  • 当连续对同一个文档进行多次更新时,第一个updatechange events可能返回的是最近update操作完成后的文档内容;

  • 当一个文档更新后立即删除时,其对应的change events中因查不到变更后的文档内容而展示空的fullDocument字段。

更多信息参考Lookup Full Document for Update Operations

处理超大变更事件 (>16MB)

前提条件

MongoDB 7.0及以上版本(升级指南)。

操作步骤 

在 watch() 的管道中包含 $changeStreamSplitLargeEvent 阶段:

myChangeStreamCursor = db.myCollection.watch(
  [ { $changeStreamSplitLargeEvent: {} } ], // 添加切分阶段
  {
    fullDocument: "required",
    fullDocumentBeforeChange: "required"
  }
)

结果验证

  • 执行一个会导致变更事件 >16MB 的操作(如更新一个包含超大数组的文档)。

  • 观察返回的事件流会被拆分成多个连续的fragment事件,最终以一个fragment事件结束。

降低前镜像存储开销

默认Pre-imageoplog过期。可设置更短的过期时间以节省空间:

警告

设置过短的 expireAfterSeconds 且下游消费速度不足时,可能导致 ChangeStreamHistoryLost 错误(因 Pre-image 过早过期),详情可参考Change Streams with Document Pre- and Post-Images

db.adminCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 } // 单位:秒
    }
  }
})

变更流最佳实践 

  1. 谨慎开启前后镜像:

    • 启用 fullDocumentBeforeChange (Pre-image) 和 fullDocument (Post-image) 会增加存储开销 (config.system.preimages 表) 和请求延迟。

    • 仅在业务确实需要文档变更前后完整内容时开启。

  2. 分片集群部署要点:

    • 始终在 mongos 上建立变更流监听,以保证全局事件顺序。

    • 高写入负载下,变更流可能成为瓶颈(mongos 需排序合并分片事件)。

    • 分片间写入不均衡(如设计不佳的分片键)会显著增加变更流延迟。

  3. 避免 updateLookup

    • updateLookup会为每个更新事件执行单独的findOne查询,效率低下。

    • 在分片集群中,moveChunk操作会进一步加剧updateLookup的延迟问题。

  4. 防范变更流中断:

    • ⚠️以下场景会导致变更流游标失效 (operationType: "invalidate") 或错误。

      • 下游消费滞后:消费速度慢于事件产生速度,导致resumeToken超出 oplog 窗口。

      • 无效 resumeToken使用过旧的resumeToken恢复,其对应时间点已不在 oplog 中。

      • 故障转移影响:Failover 后新主节点的 oplog 可能不包含原resumeToken

      • 元数据变更:droprenamedropDatabase操作可能会触发invalidate事件。

      • Pre-image 过期:expireAfterSeconds 设置过短且消费慢导致 Pre-image 丢失。

    • 应对方案:

      • 监控变更流延迟。

      • 确保 oplog 窗口足够大。

      • 设计健壮的错误处理和恢复逻辑(捕获invalidate事件,记录最后有效resumeToken,重建监听)。

      • 设置合理的expireAfterSeconds

  5. 作用域选择策略:

    • 单一变更流 vs 多个集合级变更流:

      • 单一流 (库/实例级):资源开销较小(单线程拉取 oplog),但需下游自行过滤分发事件。高事件量下 mongos 可能成为瓶颈。

      • 多个集合级流:可利用服务端过滤减少网络传输,并发性更好。但过多流会增加 oplog 读取争抢和资源消耗。

    • 建议: 根据业务负载(事件量、集合数量)进行测试,选择最优方案。通常,对少量高活跃度集合使用独立流,对大量低活跃度集合使用库/实例级流配合下游过滤。