Zookeeper Watcher实现分析
2017-06-28 本文已影响0人
huiwq1990
Watcher向服务器提交
整体流程是将Watcher与节点信息发送到服务器。
org.apache.zookeeper.ZooKeeper#getChildren
public void getChildren(final String path, Watcher watcher,
ChildrenCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// 创建ChildWatchRegistration
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = new GetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = new GetChildrenResponse();
// 创建package
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
org.apache.zookeeper.ClientCnxn#queuePacket
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
// 将watchRegistration封装到package中
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
Wacher管理器
ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher及默认的Watcher。Watcher被分为dataWatches,existWatches,childWatches。
org.apache.zookeeper.ZooKeeper.ZKWatchManager
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
// zk初始化的时候会设置这个值
//new ZooKeeper("localhost:2181", 2000, new DefualtWatcher());
private volatile Watcher defaultWatcher;
Watcher Client注册机制
image.png当请求包成功返回后,触发org.apache.zookeeper.ZooKeeper.WatchRegistration#register方法。
// 根据packet的返回值决定是否要注册Watcher
protected boolean shouldAddWatch(int rc) {
return rc == 0;
}
// ChildWatcher org.apache.zookeeper.ZooKeeper.ChildWatchRegistration
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.childWatches;
}
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
// 需要加锁
synchronized(watches) {
// 获取这个目录的watcher
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
Watcher怎么触发
Watcher为什么重新注册
Watcher服务端实现
http://xkorey.iteye.com/blog/2204495
http://blog.jobbole.com/104833/
https://github.com/llohellohe/zookeeper
https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html