mongo(十五:Change Stream)
Change Stream即变更流,是MongoDB向应用发布数据变更的一种方式。即当数据库中有任何数据发生变化,应用端都可以得到通知。我们可以将其理解为在应用中执行的触发器。至于应用想得到什么数据,以什么形式得到数据,则可以通过聚合框架加以过滤和转换。
当然Change Stream与传统数据库的触发器还是有一定差别的
image.png
MongoDB复制集大致是如何工作的
1,应用通过驱动向数据库发起写入请求;
2,在同一个事务中,MongoDB完成oplog和集合的修改;
3,oplog被其他从节点拉走;
4,从节点应用得到的oplog,同样在一个事务中完成对oplog和集合的修改;
整个同步过程是依赖于oplog来进行的。也就是说oplog实际上已经包含了我们需要的所有变更数据。如果观测oplog的变化,是否就能够得到所有变更的数据了呢?对,change stream正是基于这个原理实现的。
Change Stream 是基于 oplog 实现的。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:
• insert/update/delete:插入、更新、删除;
• drop:集合被删除;
• rename:集合被重命名;
• dropDatabase:数据库被删除;
• invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发,并关闭 change stream;
Change Stream 只推送已经在大多数节点上提交的变更操作。即“可重复读”的变更。
这个验证是通过 {readConcern: “majority”} 实现的。因此:
• 未开启 majority readConcern 的集群无法使用 Change Stream;
• 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构中的 S 因故障宕机)。
Change Stream 变更过滤
如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件。
var cs = db.collection.watch([{$match: {operationType: {$in: ['insert', 'delete']}}}])
Change Stream 故障恢复
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。
var cs = db.collection.watch([], {resumeAfter: <_id>})
即可从上一条通知中断处继续获取后续的变更通知。
接下来简单实践下,首先搭建好复制集,在主节点开启watch,此时插入数据将会被监听到
image.png往复制集删除节点是 rs.remove("XXX"),添加节点是rs.add("XXX“),现保持复制集有5个节点,1主3从1仲裁
https://docs.mongodb.com/manual/tutorial/convert-secondary-into-arbiter/
此时启动四个节点(1主2从1仲裁)
在主节点执行watch的时候可以正常监听
此时只启动三个节点(1主1从1仲裁)
执行db.test.insert({count:1},{writeConcern:{w:"majority",j:true,wtimeout:3000}})会发现超时
调整一下setDefaultRWConcern把writeConcern写为1
https://docs.mongodb.com/manual/reference/command/setDefaultRWConcern/
在主节点执行watch(插入成功但是watch没监听到)
insert.png
watch.png
结论:只有大多数节点有数据,才会触发Change Stream
注意事项
• Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;(oplog 是循环使用的,具体可看之前的文章)
• 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
• 同理,删除数据时通知的仅是删除数据的 _id
基于spring-data 实现Change Stream https://my.oschina.net/dushougudu/blog/4819773