本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
当您需要实时响应数据库中的数据变更时,可以通过订阅变更MongoDB变更流(Change Streams)解决。本文将为您介绍MongoDB中变更流(Change Stream)相关的内容、使用方法及最佳实践。
什么是变更流(Change Stream)
变更流将数据库的变更事件转化为实时流。客户端可订阅此流,在数据插入、更新或删除时立即收到通知。典型应用场景包括:
跨集群数据同步: 实现MongoDB集群间的增量数据复制。
操作审计: 跟踪高风险操作(如删除数据库或集合)。
事件驱动架构: 将变更推送到下游系统进行实时分析、缓存更新或通知。
使用限制
实例类型为副本集实例或分片集群实例。
配置变更流
监听更多的DDL事件
前提条件
MongoDB 6.0及以上版本(升级指南)。
操作步骤
使用 mongosh 连接数据库。
执行命令
watch
,设置showExpandedEvents: true
:// mongo shell or mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // 启用 更多DDL 事件监听 } ); cursor.next();
结果验证
在新的SQL窗口执行变更SQL,如
db.createCollection("myCollection1")
。观察原SQL窗口输出上述执行SQL相关信息。
开启前镜像(Pre-image)
MongoDB的pre-image是指文档在修改或删除前的完整状态快照,用于记录数据变更前的原始值。
前提条件
MongoDB 6.0及以上版本(升级指南)。
操作步骤
开启数据库维度前镜像 :
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留时间 } } })
开启集合维度前镜像 :
说明若需为某数据库内所有集合启用前镜像,则必须先开启数据库维度前镜像,然后在该库的每个集合上单独启用集合级前镜像功能。
修改现有集合
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })
创建新集合时指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
建立变更流监听 (指定Pre-image选项):
// 在目标集合上建立监听 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // 或 'whenAvailable' fullDocumentBeforeChange: 'required' // 或 'whenAvailable' } ) cursor.next();
required
: 服务端必须返回Pre/Post-image,否则报错。whenAvailable
: 服务端尽力返回,不保证一定有。
结果验证
检查数据库维度的开关是否已开启:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
开启成功后输出如下信息:
检查集合配置:
db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})
预期输出:在返回的文档中找到类似
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }
的字段。在另一个mongosh窗口更新
myCollection
中的文档。观察
cursor
返回的事件,应包含fullDocumentBeforeChange
字段(变更前文档)。
开启后镜像(Post-image)
MongoDB的Post-image是变更发生后文档的最新完整状态快照,用于记录变更后的完整文档内容。
前提条件
MongoDB 3.6及以上版本(升级指南)。
操作步骤
执行watch命令时设置fullDocument: 'updateLookup'
。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
结果验证
在另一个mongosh窗口新增或更新
myCollection
中的文档。观察
cursor
返回的事件,应包含fullDocument
字段(变更后文档)。
返回的完整文档可能为空或者不是point-in-time的。比如:
当连续对同一个文档进行多次更新时,第一个update的change events可能返回的是最近update操作完成后的文档内容;
当一个文档更新后立即删除时,其对应的change events中因查不到变更后的文档内容而展示空的fullDocument字段。
处理超大变更事件 (>16MB)
前提条件
MongoDB 7.0及以上版本(升级指南)。
操作步骤
在 watch()
的管道中包含 $changeStreamSplitLargeEvent
阶段:
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 添加切分阶段
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)
结果验证
执行一个会导致变更事件 >16MB 的操作(如更新一个包含超大数组的文档)。
观察返回的事件流会被拆分成多个连续的
fragment
事件,最终以一个fragment
事件结束。
降低前镜像存储开销
默认Pre-image随oplog过期。可设置更短的过期时间以节省空间:
设置过短的 expireAfterSeconds
且下游消费速度不足时,可能导致 ChangeStreamHistoryLost
错误(因 Pre-image 过早过期),详情可参考Change Streams with Document Pre- and Post-Images。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 单位:秒
}
}
})
变更流最佳实践
谨慎开启前后镜像:
启用
fullDocumentBeforeChange
(Pre-image) 和fullDocument
(Post-image) 会增加存储开销 (config.system.preimages
表) 和请求延迟。仅在业务确实需要文档变更前后完整内容时开启。
分片集群部署要点:
始终在
mongos
上建立变更流监听,以保证全局事件顺序。高写入负载下,变更流可能成为瓶颈(
mongos
需排序合并分片事件)。分片间写入不均衡(如设计不佳的分片键)会显著增加变更流延迟。
避免
updateLookup
:updateLookup
会为每个更新事件执行单独的findOne
查询,效率低下。在分片集群中,
moveChunk
操作会进一步加剧updateLookup
的延迟问题。
防范变更流中断:
⚠️以下场景会导致变更流游标失效 (
operationType: "invalidate"
) 或错误。下游消费滞后:消费速度慢于事件产生速度,导致
resumeToken
超出 oplog 窗口。无效
resumeToken
:使用过旧的resumeToken
恢复,其对应时间点已不在 oplog 中。故障转移影响:Failover 后新主节点的 oplog 可能不包含原
resumeToken
。元数据变更:
drop
、rename
、dropDatabase
操作可能会触发invalidate
事件。Pre-image 过期:
expireAfterSeconds
设置过短且消费慢导致 Pre-image 丢失。
应对方案:
监控变更流延迟。
确保 oplog 窗口足够大。
设计健壮的错误处理和恢复逻辑(捕获
invalidate
事件,记录最后有效resumeToken
,重建监听)。设置合理的
expireAfterSeconds
。
作用域选择策略:
单一变更流 vs 多个集合级变更流:
单一流 (库/实例级):资源开销较小(单线程拉取 oplog),但需下游自行过滤分发事件。高事件量下
mongos
可能成为瓶颈。多个集合级流:可利用服务端过滤减少网络传输,并发性更好。但过多流会增加
oplog
读取争抢和资源消耗。
建议: 根据业务负载(事件量、集合数量)进行测试,选择最优方案。通常,对少量高活跃度集合使用独立流,对大量低活跃度集合使用库/实例级流配合下游过滤。