Zookeeper Watcher机制
ZooKeeper是用来协调(同步)分布式进程的服务,多个分布式进程通过ZooKeeper提供的API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型
。ZooKeeper实现这些分布式进程的状态(ZNode的Data、Children)共享时,基于性能的考虑采用了类似的异步非阻塞的主动通知模式即Watch机制,使得分布式进程之间的“共享状态通信”更加实时高效。注意,这种共享也需要zookeeper使得分布式进程能够顺序执行,保证结果的正确性,Zab协议使得ZooKeeper的内部修改状态操作直接是有序串行的。在此不讨论zab协议。
Zookeeper Watcher架构
Zookeeper Watcher流程
是客户端向服务端的某个节点路径上注册一个watcher,客户端同时会在本地watcherManager中存储特定的watcher,当发生节点数据或者节点子节点变化时,服务端会通知客户端节点变化信息,然后客户端收到通知后,会调用回调函数。
实现原理
Watcher接口
:客户端用来接收从 ZooKeeper 服务端发过来的消息并且同步地处理这些消息,如果要处理这个消息,需要为客户端注册一个 CallBack(回调)watcher对象。设计如下:
在 Watcher 接口里面,除了回调函数 process 以外,还包含 KeeperState 和 EventType 两个枚举类,分别代表了事件发生时ZooKeeper连接状态和事件的类型。
watcher通知状态和事件类型表
根据特定的事件,调用
process(WatchedEvent event)
方法对事件进行处理。
WatchedEvent
和 WatcherEvent
都表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent 是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而 WatcherEvent 因为实现了序列化接口,因此可以用于网络传输。
服务端在线程 WatchedEvent 事件之后,会调用 getWrapper 方法将自己包装成一个可序列化的 WatcherEvent 事,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将 WatcherEvent 事件还原成一个 WatchedEvent 事件,并传递给 process 方法处理,回调方法 process 根据入参就能够解析出完整的服务端事件了。
客户端注册Watcher
涉及接口:
//创建zk客户端对象实例时注册,这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,
//会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 里面,
//如果这个被创建的节点在其它时候被创建watcher并注册,则这个默认的watcher会被覆盖
//watcher触发一次就会失效,不管是创建节点时的 watcher 还是以后创建的 watcher.因为服务端每次触发之后就会删掉服务端的watcher
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
getChildren(String path, Watcher watcher)
//Boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
getChildren(String path, boolean watch)
getData(String path, boolean watch, Stat stat)
getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
exists(String path, boolean watch)
exists(String path, Watcher watcher)
客户端注册流程
在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用 queuePacke 方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个普遍认同的观念。随后,SendThread 线程会通过 readResponse 方法接收来自服务端的响应,异步地调用 finishPacket 方法从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。
WatcherRegistation 除了 Header 和 request 两个属性被传递到了服务端,其他都没有到服务端,否则服务端就容易出现内存紧张甚至溢出的危险,因为数据量太大了。这就是 ZooKeeper 为什么适用于分布式环境的原因,它在网络中传输的是消息,而不是数据包实体。
服务端处理 Watcher 流程
对于注册 Watcher 请求
,FinalRequestProcessor 的 ProcessRequest 方法会判断当前请求是否需要注册 Watcher,如果为 true,就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接,我们后面讲到的 process 回调方法,实际上也是从这里回调的,所以可以把 ServerCnxn 看作是一个 Watcher 对象。数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。WatchManager 负责 Watcher 事件的触发,它是一个统称,在服务端 DataTree 会托管两个 WatchManager,分别是watchTable和 watch2Paths,分别对应数据变更 Watcher 和节点变更 Watcher。
当DataTree中节点数据内容或版本发生变化或节点变更时
,会调用相应方法去触发 WatchManager 的 triggerWatch 方法,该方法返回 ZNODE 的信息,自此进入到回调本地 process 的序列。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
//将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象
HashSet<Watcher> watchers;
synchronized (this) {
//根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,
//说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找打了 Watcher 就将其提取出来,
//同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的ServerCnxn 作为一个 Watcher 存储,
//所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
w.process(e);
}
return watchers;
}
ServerCnxn 类代码
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
客户端收到消息后,会调用 ClientCnxn 的 SendThread.readResponse 方法来进行统一处理,如清单所示。如果响应头 replyHdr 中标识的 Xid 为 02,表示是 ping,如果为-4,表示是验证包,如果是-1,表示这是一个通知类型的响应,然后进行反序列化、处理 chrootPath、还原 WatchedEvent、回调 Watcher 等步骤,其中回调 Watcher 步骤将 WacthedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调。
Zookeeper Watcher特点
注册只能确保一次消费
无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。如果注册一个 Watcher 之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。
持久Watcher需要每次收到通知事件后重复注册。
客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。
轻量级设计
WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分的内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的 Watcher 只会通知客户指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据,这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。这样轻量级的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。
参考资料:
ZooKeeper Watcher机制
Apache ZooKeeper Watcher 机制源码解释
品味ZooKeeper之Watcher机制