zookeeper源码分析

zookeeper源码分析(2)-客户端启动流程

2019-02-11  本文已影响0人  Monica2333

zookeeper原生客户端启动流程

客户端整体结构如下:


核心组件
ZooKeeper
客户端的入口,负责启动整个客户端。持有ClientCnxnZKWatchManager的实例,提供了客户端对节点操作的方法。
ZKWatchManager
watcher机制是zookeeper的一大特性,实现了分布式数据变化对客户端的实时通知功能,需要服务端和客户端的共同实现,可参考Zookeeper Watcher机制。包括对节点数据内容,子节点变化和默认none类型事件的三种watch.
ClientCnxn
管理客户端的socket i/o,包含两个核心线程。SendThread负责与服务端之间的读写网络通信和心跳维持。
EventThread主要对经过SendThread反序列化后的具体请求根据类型的不同,具体处理,同时也会负责对请求事件的watch回调。此外ClientCnxn含有HostProvider对象,能够在会话未过期前与当前服务端连接中断时,自动选择一个可用的服务端重建连接。将在初始化过程中详细介绍。
StaticHostProvider
zookeeper对HostProvider接口的简单实现,持有服务端的地址列表,可通过轮询的方式获得服务端的所有地址。HostProvider接口定义如下:
public interface HostProvider {
//返回服务器地址的总大小
    public int size();

    /**
     * The next host to try to connect to.
     * 
     * For a spinDelay of 0 there should be no wait.
     * 必须返回已被解析的InetSocketAddress对象

     */
    public InetSocketAddress next(long spinDelay);

    /**
     * Notify the HostProvider of a successful connection.
     * The HostProvider may use this notification to reset it's inner state.
     */
    public void onConnected();
}

在对StaticHostProvider的初始化时,会将解析出来的InetSocketAddress对象随机打散,防止多个客户端均连接到一台服务器上

 public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
            throws UnknownHostException {
      ·············省略List<InetSocketAddress> serverAddresses的构造过程·····················
        Collections.shuffle(this.serverAddresses);
    }

主要看下StaticHostProvider.next的实现

//记录当前客户端奕尝试连接完所有服务端的位置
private int lastIndex = -1;
//记录当前客户端尝试连接的服务端位置
private int currentIndex = -1;

 public InetSocketAddress next(long spinDelay) {
        ++currentIndex;
        if (currentIndex == serverAddresses.size()) {
            currentIndex = 0;
        }
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }

        return serverAddresses.get(currentIndex);
    }

实现逻辑就是从位于第一个的服务器开始,挨个向后获取下一个可尝试连接的服务器地址,如果走了一轮,则在下一轮获取第一个服务器前,需要睡眠spinDelayms,因为这种情况很可能服务器集群出现了某种故障。流程示意图如下:


环形地址列表队列

初始化过程
直接初始化一个ZooKeeper客户端对象,就是客户端的启动入口。如:

ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER, ZK_CONNECTION_TIMEOUT, null);

最终构造方法为:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
//设置默认defaultWatcher 
//private final ZKWatchManager watchManager = new ZKWatchManager();
        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

流程为:
1.设置默认defaultWatcher
主要是用来对事件类型为EventType.None类型的回调。
2.初始化StaticHostProvider
主要是将服务端地址列表字符串解析为List<InetSocketAddress> serverAddresses,可供socket连接使用
3.初始化ClientCnxn
首先看下ClientCnxn的主要成员变量

//Packet:封装了客户端一次请求或服务端一次响应的完整数据
//已经发送但是等待服务端响应的packet集合
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    //需要发送的packet集合
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//建立连接的超时时间
    private int connectTimeout;
 //服务端认为的下一次会话过期的具体时间
    private volatile int negotiatedSessionTimeout;
//客户端认为的最大会话超时时间,默认为sessionTimeout * 2 / 3
    private int readTimeout;
//会话的超时时间
    private final int sessionTimeout;
    private long sessionId;
    private byte sessionPasswd[] = new byte[16];
//客户端的命名空间,客户端所有的数据节点的路径都会默认在这层路径下创建。可通过`connectString`参数
//传入,如`127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a`,则`chrootPath=/app/a
final String chrootPath;
//用于记录客户端请求发起的先后顺序,没发送一个packet出去加1
    private int xid = 1;
//客户端连接状态,面向与服务端的连接状态
    private volatile States state = States.NOT_CONNECTED;
    final SendThread sendThread;
    final EventThread eventThread;
  private final ZooKeeper zooKeeper;
    private final ClientWatchManager watcher;
private final HostProvider hostProvider;

构造方法为:

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
            throws IOException {
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
             clientCnxnSocket, 0, new byte[16], canBeReadOnly);
    }

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();

        readTimeout = sessionTimeout * 2 / 3;
//是否为只读客户端,默认不是
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

初始化sendThread的构造方法为:

SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(makeThreadName("-SendThread()"));
            state = States.CONNECTING;
            this.clientCnxnSocket = clientCnxnSocket;
            setDaemon(true);
        }

主要是将当前客户端连接状态置为States.CONNECTING,并初始化了ClientCnxnSocket

ClientCnxnSocket :负责实现更底层与服务端的socket io,默认实现为ClientCnxnSocketNIO,即NIO方式,它的成员变量为:

//socket是否未初始化
protected boolean initialized;
//已经发送的请求数量
    protected long sentCount = 0;
//已经接收的响应数量
    protected long recvCount = 0;
//最后一次接收响应的时间
    protected long lastHeard;
//最后一次发送请求的时间
    protected long lastSend;
//记录当下的时间
    protected long now;
    protected ClientCnxn.SendThread sendThread;

//输出日志和异常信息时使用
    protected long sessionId;
//NIO相关
private final Selector selector = Selector.open();
 private SelectionKey sockKey;
/**
     * This buffer is only used to read the length of the incoming message.
     */
    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);

    /**
     * After the length is read, a new incomingBuffer is allocated in
     * readLength() to receive the full message.
     */
    protected ByteBuffer incomingBuffer = lenBuffer;

ClientCnxnSocket底层梳理IO的方法为doTransport,将在后续介绍。

eventThread:事件处理线程,同样为守护线程,主要成员变量为:

//待处理事件的集合
private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<Object>();

        /** This is really the queued session state until the event
         * thread actually processes the event and hands it to the watcher.
         * But for all intents and purposes this is the state.
         */
//面向事件处理
        private volatile KeeperState sessionState = KeeperState.Disconnected;

       private volatile boolean wasKilled = false;
       private volatile boolean isRunning = false;

4.启动ClientCnxn

public void start() {
        sendThread.start();
        eventThread.start();
    }

可以看到分别启动了sendThread和eventThread线程
sendThread.run

public void run() {
//初始化clientCnxnSocket.sendThread和clientCnxnSocket.sessionId,后者此时为0
            clientCnxnSocket.introduce(this,sessionId);
//更新clientCnxnSocket.now为当前时间
            clientCnxnSocket.updateNow();
//更新clientCnxnSocket.lastSend和clientCnxnSocket.lastHeard为当前时间
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = System.currentTimeMillis();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            while (state.isAlive()) {
//连接状态不为States.CLOSED 或 States. AUTH_FAILED
                try {
//当clientCnxnSocket.sockKey == null时,说明底层连接已和服务端断开
                    if (!clientCnxnSocket.isConnected()) {
                        if(!isFirstConnect){
//不是第一次连接且连接断开
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
//非客户端主动关闭或认证失败,连接断开时客户端会重新连接服务端
//也是第一次连接的入口
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    if (state.isConnected()) {
                           ······················省略sasl认证··················
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
//重置连接可用时间
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
              
                  ·····················省略定时心跳发送和检测···················
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " : " + e.getMessage());
                        }
                        break;
                    } else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else {
                            LOG.warn(
                                    "Session 0x"
                                            + Long.toHexString(getSessionId())
                                            + " for server "
                                            + clientCnxnSocket.getRemoteSocketAddress()
                                            + ", unexpected error"
                                            + RETRY_CONN_MSG, e);
                        }
//说明线程出现异常,清空pendingQueue和outgoingQueue,需要等待客户端重新连接
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                }
            }
            cleanup();
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));
        }

第一次创建连接的方法为sendThread.startConnect

 private void startConnect() throws IOException {
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }
··············省略zookeeper Sasl认证校验的过程,如果失败会发送EventType.None、KeeperState.AuthFailed的事件···············
            clientCnxnSocket.connect(addr);
        }

实际建立连接的方法为clientCnxnSocket.connect

 void connect(InetSocketAddress addr) throws IOException {
        SocketChannel sock = createSock();
        try {
           registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
        initialized = false;
// Reset incomingBuffer
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//因为SocketChannel sock是非阻塞的,此时并没有马上建立真正的连接就会返回true
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

当连接返回时,会调用sendThread.primeConnection构建发送队列的packet

 void primeConnection() throws IOException {
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
                // We add backwards since we are pushing into the front
                // Only send if there's a pending watch
                // TODO: here we have the only remaining use of zooKeeper in
                // this class. It's to be eliminated!
                if (!disableAutoWatchReset) {
                    List<String> dataWatches = zooKeeper.getDataWatches();
                    List<String> existWatches = zooKeeper.getExistWatches();
                    List<String> childWatches = zooKeeper.getChildWatches();
                    if (!dataWatches.isEmpty()
                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                        long setWatchesLastZxid = lastZxid;

                        while (dataWatchesIter.hasNext()
                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                            List<String> dataWatchesBatch = new ArrayList<String>();
                            List<String> existWatchesBatch = new ArrayList<String>();
                            List<String> childWatchesBatch = new ArrayList<String>();
                            int batchLength = 0;

                            // Note, we may exceed our max length by a bit when we add the last
                            // watch in the batch. This isn't ideal, but it makes the code simpler.
                            while (batchLength < SET_WATCHES_MAX_LENGTH) {
                                final String watch;
                                if (dataWatchesIter.hasNext()) {
                                    watch = dataWatchesIter.next();
                                    dataWatchesBatch.add(watch);
                                } else if (existWatchesIter.hasNext()) {
                                    watch = existWatchesIter.next();
                                    existWatchesBatch.add(watch);
                                } else if (childWatchesIter.hasNext()) {
                                    watch = childWatchesIter.next();
                                    childWatchesBatch.add(watch);
                                } else {
                                    break;
                                }
                                batchLength += watch.length();
                            }

                            SetWatches sw = new SetWatches(setWatchesLastZxid,
                                    dataWatchesBatch,
                                    existWatchesBatch,
                                    childWatchesBatch);
                            RequestHeader h = new RequestHeader();
                            h.setType(ZooDefs.OpCode.setWatches);
                            h.setXid(-8);
                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                            outgoingQueue.addFirst(packet);
                        }
                    }
                }

                for (AuthData id : authInfo) {
                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                            OpCode.auth), null, new AuthPacket(0, id.scheme,
                            id.data), null, null));
                }
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));
            }
//只注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件
            clientCnxnSocket.enableReadWriteOnly();
           
        }

注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件到selector上
发送队列outgoingQueue从头到尾添加的packet的请求内容依次为:ConnectRequest ->AuthPacket(如果需要认证的话)->SetWatches (ps:当前watch信息,是需要服务端响应的请求,所以每次成功建立连接都需要将当前客户端的watch信息发送给连接的服务端)
之后run方法会调用clientCnxnSocket.doTransport,也是处理整个IO的方法

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }

此时selector上注册的事件为读写事件,将会调用doIO

 void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
//第一次连接调到此处
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }

                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                    // On initial connection, write the complete connect request
                    // packet, but then disable further writes until after
                    // receiving a successful connection response.  If the
                    // session is expired, then the server sends the expiration
                    // response and immediately closes its end of the socket.  If
                    // the client is simultaneously writing on its end, then the
                    // TCP stack may choose to abort with RST, in which case the
                    // client would never receive the session expired event.  See
                    // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
        }

首先会触发写事件sockKey.isWritable(),(因为写缓冲区可写但此时服务端并不会发送数据到客户端)将从outgoingQueue取出第一个ConnectRequest 的packet发送出去,然后取消写事件,等待服务端的响应
当除发读事件sockKey.isReadable()时,读取服务端的连接响应数据readConnectResult

void readConnectResult() throws IOException {

        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }

        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

会将响应数据反序列化为ConnectResponse conRsp,从中读取sessionId ,并调用连接回调函数sendThread.onConnected

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                throw new SessionExpiredException(warnInfo);
            }
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
//lastIndex = currentIndex
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

1.根据服务端认可的下次会话过期时间,重置客户端的negotiatedSessionTimeout,readTimeoutconnectTimeout
2.回调hostProvider,重置环形地址列表的指针lastIndex
3.设置sessionId ,sessionPasswd
4.根据是否只读,设置连接状态state= States.CONNECTEDREADONLY或 States.CONNECTED
5.发送连接状态事件给eventThread

接下来当一直是连接状态的时候,sendThread就开始不停的处理读写请求数据和定时发送心跳,可参考会话管理
eventThread.run

 public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down for session: 0x{}",
                     Long.toHexString(getSessionId()));
        }

主要流程为:
1.读取请求事件Object event = waitingEvents.take()
2.处理请求事件processEvent(event),再此先不展开事件具体处理。

参考资料:ZooKeeper的Java客户端使用

感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~

上一篇下一篇

猜你喜欢

热点阅读