Zookeeper之Watch机制源码分析
Zookeeper的Watch机制提供了服务通知功能,包含客户端的注册和触发、服务端的注册和触发
![](https://img.haomeiwen.com/i7310356/8fe58e36f09fd1ed.png)
Zookeeper客户端:
通过ZKWatchManager管理
结构都是存储Map<String, Set<Watcher>>
例如:{"/abc",Set<Watcher>} 表示/abc节点下绑定了Watcher有哪些
1、临时的一次性的,用完即删除有dataWatches、existWatches、childWatches
2、永久性的有persistentWatches、persistentRecursiveWatches(表示递归的永久性Watcher)
Zookeeper服务端:
有dataWatches和childWatches
每个属性下面都有watchTable、watch2Paths。
服务端的Watcher表示当前客户端的实现类NioServerCnxn
watchTable:Map<String, Set<Watcher>>
例如:{"/abc",Set<Watcher>} 表示/abc节点下有哪些客户端含有Watcher
watch2Paths: Map<Watcher, Set<String>>
例如:{Watcher,Set<String>} 表示当前客户端下有那些路径下有Watcher
客户端注册
当调用自定义getData方法时
![](https://img.haomeiwen.com/i7310356/902b00363b5c8e58.png)
1、getData
这里并没有注册
org.apache.zookeeper.ZooKeeper#getData
![](https://img.haomeiwen.com/i7310356/e0bd5dd4dc7cdd1e.png)
①、把watcher和clientPath封装成DataWatchRegistration对象
org.apache.zookeeper.ZooKeeper.DataWatchRegistration#DataWatchRegistration
![](https://img.haomeiwen.com/i7310356/f112b417d1c3eed2.png)
org.apache.zookeeper.ZooKeeper.WatchRegistration#WatchRegistration
![](https://img.haomeiwen.com/i7310356/a2f963a88d9c8c54.png)
②、提交请求submitRequest
把封装的对象使用cnxn.submitRequest方法提交请求,最终把DataWatchRegistration wcb赋值给数据包Packet的watchRegistration属性
![](https://img.haomeiwen.com/i7310356/508f75b2cad7431b.png)
2、通过SendThread线程的readResponse读取响应数据
①、readResponse
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
![](https://img.haomeiwen.com/i7310356/d20ac85e26f9a47f.png)
②、finishPacket
org.apache.zookeeper.ClientCnxn#finishPacket
![](https://img.haomeiwen.com/i7310356/2eab625f69fde621.png)
③、register
通过数据包的WatchRegistration属性值注册watch
org.apache.zookeeper.ZooKeeper.WatchRegistration#register
![](https://img.haomeiwen.com/i7310356/ec861f0154a686a4.png)
org.apache.zookeeper.ZooKeeper.DataWatchRegistration#getWatches
获取的是ZKWatchManager#dataWatches属性
![](https://img.haomeiwen.com/i7310356/9e765ac68d9bb74c.png)
最终把当前watch注册给了ZKWatchManager的dataWatches属性值。
服务端注册
服务端的注册是在请求处理链的最后一个请求处理类FinalRequestProcessor的processRequest方法来调用的
客户端是以zooKeeper.getData方法来测试的,所以直接到getData
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
![](https://img.haomeiwen.com/i7310356/c7dee2dcf686a5af.png)
org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest
![](https://img.haomeiwen.com/i7310356/bb8b74401e56bd46.png)
org.apache.zookeeper.server.ZKDatabase#getData
![](https://img.haomeiwen.com/i7310356/9bc4e086f5543576.png)
org.apache.zookeeper.server.DataTree#getData
![](https://img.haomeiwen.com/i7310356/380b7c25b7659415.png)
org.apache.zookeeper.server.watch.WatchManager#addWatch
![](https://img.haomeiwen.com/i7310356/234712699133db09.png)
org.apache.zookeeper.server.watch.WatchManager#addWatch
①、watchTable
Map<String, Set<Watcher>> 表示一个路径有多少个客户端Watcher(NioServerCnxn对象)
![](https://img.haomeiwen.com/i7310356/c8034423acad16fe.png)
②、watch2Paths
Map<Watcher, Set<String>> 表示一个客户端Watcher(NioServerCnxn对象)有多少个路径
![](https://img.haomeiwen.com/i7310356/173e5a7a129c0f51.png)
dataWatches和childWatches都是添加watchTable、watch2Paths的属性值
dataWatches和childWatches是在DataTree创建对象时创建的,都属于IWatchManager对象
![](https://img.haomeiwen.com/i7310356/02eeb40ad2e1f072.png)
客户端触发
是在SendThread线程读取响应readResponse时
1、添加到waitingEvents队列
NOTIFICATION_XID = -1 是触发WATCHER_EVENT
①、readResponse
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
![](https://img.haomeiwen.com/i7310356/d4255e3e150a5f46.png)
![](https://img.haomeiwen.com/i7310356/df39144671988bc8.png)
②、queueEvent
![](https://img.haomeiwen.com/i7310356/98ce305c8f5fea4c.png)
org.apache.zookeeper.ClientCnxn.EventThread#queueEvent
![](https://img.haomeiwen.com/i7310356/39e4758fdf604e56.png)
把当前响应事件添加到waitingEvents队列中
2、一次性Watch与持久Watch的区别
在添加waitingEvents队列之前,调用materialize方法根据路径查找客户端的Watch
org.apache.zookeeper.ZKWatchManager#materialize
![](https://img.haomeiwen.com/i7310356/b97e01b5aa13993d.png)
![](https://img.haomeiwen.com/i7310356/e2ec275dc33ecb57.png)
dataWatches把当前节点的watch给移除了,这就是为什么zooKeeper.getData方法的watch只能调用一次
而持久化节点并没有移除
![](https://img.haomeiwen.com/i7310356/a46664c20dfa5ed6.png)
3、调用watch的process方法
便可以调用到客户端自定义的监听watch的process方法
org.apache.zookeeper.ClientCnxn.EventThread#run
![](https://img.haomeiwen.com/i7310356/e182212d259ca18d.png)
org.apache.zookeeper.ClientCnxn.EventThread#processEvent
![](https://img.haomeiwen.com/i7310356/def13caca6c57edf.png)
服务端触发
服务端的触发也是在请求处理链的最后一个请求处理类FinalRequestProcessor的processRequest方法来调用的
1、processRequest
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
![](https://img.haomeiwen.com/i7310356/087bc88e2d9274d2.png)
2、applyRequest
org.apache.zookeeper.server.FinalRequestProcessor#applyRequest
![](https://img.haomeiwen.com/i7310356/276ddbcafd33f8e9.png)
3、processTxn
org.apache.zookeeper.server.ZooKeeperServer#processTxn(org.apache.zookeeper.server.Request)
![](https://img.haomeiwen.com/i7310356/5fa610ec1751f0d1.png)
4、processTxnInDB
org.apache.zookeeper.server.ZooKeeperServer#processTxnInDB
![](https://img.haomeiwen.com/i7310356/d2415b819d026abf.png)
5、processTxn
org.apache.zookeeper.server.ZKDatabase#processTxn
![](https://img.haomeiwen.com/i7310356/4307a23ad03edcd9.png)
org.apache.zookeeper.server.DataTree#processTxn
![](https://img.haomeiwen.com/i7310356/3fdc3408c01323fc.png)
org.apache.zookeeper.server.DataTree#processTxn
![](https://img.haomeiwen.com/i7310356/a8a4b8d91a76d9bf.png)
org.apache.zookeeper.server.DataTree#processTxn
![](https://img.haomeiwen.com/i7310356/9899dd7eff6259a8.png)
6、setData
这里还是以/abc节点为例,开启一个客户端修改/abc的值。调用命令set /abc xxx。直接到setData
这里可以看到修改前、修改后的数据、路径、zxid等信息
org.apache.zookeeper.server.DataTree#setData
![](https://img.haomeiwen.com/i7310356/1b7d4a30f32bb8cc.png)
触发NodeDataChanged事件类型
![](https://img.haomeiwen.com/i7310356/37a9709669b1823a.png)
7、triggerWatch
org.apache.zookeeper.server.watch.WatchManager#triggerWatch
![](https://img.haomeiwen.com/i7310356/159599b1ae52102b.png)
org.apache.zookeeper.server.watch.WatchManager#triggerWatch
可以看到当前路径(/abc)下的上下文客户端对象
![](https://img.haomeiwen.com/i7310356/e19c4a4a0b32f0d8.png)
org.apache.zookeeper.server.watch.WatchManager#triggerWatch
这里调用了递归的文件,最终的效果就是查找当前路径下的客户端对象
![](https://img.haomeiwen.com/i7310356/33e01cb78a676350.png)
![](https://img.haomeiwen.com/i7310356/0f573be135f663f1.png)
遍历Watch对象的process方法
![](https://img.haomeiwen.com/i7310356/21d2b81cb5c5e470.png)
8、process
org.apache.zookeeper.server.NIOServerCnxn#process
![](https://img.haomeiwen.com/i7310356/039be2ce4ab08b8d.png)
响应头设置ClientCnxn.NOTIFICATION_XID,这里与客户端读取响应头相对应
9、sendResponse
发送响应数据到客户端
org.apache.zookeeper.server.NIOServerCnxn#sendResponse
![](https://img.haomeiwen.com/i7310356/b5109cf4e8b81e12.png)
EventType事件类型
None、NodeCreated(节点创建)、NodeDeleted(节点删除)、NodeDataChanged(节点数据修改)、NodeChildrenChanged(孩子节点改变)、DataWatchRemoved(监听器Watch移除)、ChildWatchRemoved(孩子监听器Watch移除)、PersistentWatchRemoved(持久化的监听器Watch移除)
![](https://img.haomeiwen.com/i7310356/be3b0058159caccd.png)
总结:
根据客户端使用不同的API触发不同的监听器Watch。
客户端的注册是在SendThread线程和触发是在EventThread线程
服务端的注册和触发都是在请求处理链FinalRequestProcessor的processRequest方法内