Zookeeper Watcher实现分析

2017-06-28  本文已影响0人  huiwq1990

Watcher向服务器提交

整体流程是将Watcher与节点信息发送到服务器。

org.apache.zookeeper.ZooKeeper#getChildren

   public void getChildren(final String path, Watcher watcher,
            ChildrenCallback cb, Object ctx)
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

// 创建ChildWatchRegistration
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new ChildWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getChildren);
        GetChildrenRequest request = new GetChildrenRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetChildrenResponse response = new GetChildrenResponse();
// 创建package
        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
    }

org.apache.zookeeper.ClientCnxn#queuePacket

    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        synchronized (outgoingQueue) {
// 将watchRegistration封装到package中
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

Wacher管理器

ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher及默认的Watcher。Watcher被分为dataWatches,existWatches,childWatches。

org.apache.zookeeper.ZooKeeper.ZKWatchManager

    private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();
// zk初始化的时候会设置这个值
//new ZooKeeper("localhost:2181", 2000, new DefualtWatcher());
        private volatile Watcher defaultWatcher;

Watcher Client注册机制

image.png

当请求包成功返回后,触发org.apache.zookeeper.ZooKeeper.WatchRegistration#register方法。

// 根据packet的返回值决定是否要注册Watcher
       protected boolean shouldAddWatch(int rc) {
            return rc == 0;
        }
// ChildWatcher org.apache.zookeeper.ZooKeeper.ChildWatchRegistration
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.childWatches;
        }

        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
// 需要加锁
                synchronized(watches) {
// 获取这个目录的watcher
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }

Watcher怎么触发

Watcher为什么重新注册

Watcher服务端实现

http://xkorey.iteye.com/blog/2204495
http://blog.jobbole.com/104833/
https://github.com/llohellohe/zookeeper
https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html

上一篇下一篇

猜你喜欢

热点阅读