技术干货

zk源码阅读29:Server选举Leader时的网络IO:Qu

2017-08-03  本文已影响322人  赤子心_d709

摘要

每台服务器在启动的过程中,会启动一个QuorumPeerManager,负责各台服务器之间的底层Leader选举过程中的网络通信。
这个类就是QuorumCnxManager
在看本节源码之前,建议先把refer中的内容看完,也不多,对于基本概念有一个理解

本节主要讲解

内部类
  SendWorker类作为网络IO的发送者,从发送队列取出,发给对应sid的机器
  Message类定义了消息结构,包含sid以及消息体ByteBuffer
  RecvWorker类作为网络IO的接受者
  Listener类作为electionPort端口的监听器,等待其他机器的连接
属性
  recvQueue作为接受队列
  queueSendMap表示每个sid对应的发送的发送队列
函数
  连接相关
  sender,recv生产消费相关
  其他
思考以及总结

内部类

QuorumCnxManager的内部类

可以看到有四个内部类
SendWorker类,Message类,RecvWorker类,Listener类

SendWorker

这个类作为“发送者”,继承ZooKeeperThread,线程不断地从发送队列取出,发给对应sid的机器
属性

        Long sid;//目标机器sid,不是当前机器sid
        Socket sock;
        RecvWorker recvWorker;//该sid对应的RecvWorker
        volatile boolean running = true;
        DataOutputStream dout;

主要方法

构造函数

        SendWorker(Socket sock, Long sid) {
            super("SendWorker:" + sid);
            this.sid = sid;
            this.sock = sock;
            recvWorker = null;
            try {
                dout = new DataOutputStream(sock.getOutputStream());
            } catch (IOException e) {
                LOG.error("Unable to access socket output stream", e);
                closeSocket(sock);
                running = false;
            }
            LOG.debug("Address of remote peer: " + this.sid);
        }

run方法

        @Override
        public void run() {
            threadCnt.incrementAndGet();//线程数+1
            try {
                /**
                 * If there is nothing in the queue to send, then we
                 * send the lastMessage to ensure that the last message
                 * was received by the peer. The message could be dropped
                 * in case self or the peer shutdown their connection
                 * (and exit the thread) prior to reading/processing
                 * the last message. Duplicate messages are handled correctly
                 * by the peer.
                 *
                 * If the send queue is non-empty, then we have a recent
                 * message than that stored in lastMessage. To avoid sending
                 * stale message, we should send the message in the send queue.
                 */
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);//找到sid对应需要send的队列
                if (bq == null || isSendQueueEmpty(bq)) {
                   ByteBuffer b = lastMessageSent.get(sid);//如果没有什么发的,就把上一次发的再发一遍(重发能够正确处理)
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);//发送
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
            
            try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                .get(sid);
                        if (bq != null) {
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);//从发送队列里面取出消息
                        } else {//队列没有记录在map中
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }

                        if(b != null){
                            lastMessageSent.put(sid, b);//更新最后一次发送的
                            send(b);//发送
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
                        self.getId() + " error = " + e);
            }
            this.finish();
            LOG.warn("Send worker leaving thread");
        }
    }


send方法

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            } catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", be);
                return;
            }
            dout.writeInt(b.capacity());
            dout.write(b.array());
            dout.flush();
        }

里面涉及的字段在下面会讲,这里注意
在SendWorker中,一旦Zookeeper发现针对当前服务器的消息发送队列为空,那么此时需要从lastMessageSent中取出一个最近发送过的消息来进行再次发送,这是为了解决接收方在消息接收前或者接收到消息后服务器挂了,导致消息尚未被正确处理。同时,Zookeeper能够保证接收方在处理消息时,会对重复消息进行正确的处理。

Message

这个类定义了server之间传输的消息结构,源码如下

    static public class Message {
        
        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }

        ByteBuffer buffer;
        long sid;
    }

sid为消息来源方的sid,buffer即消息体

RecvWorker

这个类作为“接受者”,类似SendWorker,继承ZooKeeperThread,线程不断地从网络IO中读取数据,放入接收队列
属性

        Long sid;//来源方sid
        Socket sock;
        volatile boolean running = true;
        DataInputStream din;//input
        final SendWorker sw;

主要方法

构造方法

        RecvWorker(Socket sock, Long sid, SendWorker sw) {
            super("RecvWorker:" + sid);
            this.sid = sid;
            this.sock = sock;
            this.sw = sw;
            try {
                din = new DataInputStream(sock.getInputStream());
                // OK to wait until socket disconnects while reading.
                sock.setSoTimeout(0);
            } catch (IOException e) {
                LOG.error("Error while accessing socket for " + sid, e);
                closeSocket(sock);
                running = false;
            }
        }

run方法
        @Override
        public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    int length = din.readInt();//获取长度
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);//解析出byteBuffer
                    addToRecvQueue(new Message(message.duplicate(), sid));//加入接收队列
                }
            } catch (Exception e) {
                LOG.warn("Connection broken for id " + sid + ", my id = " + 
                        self.getId() + ", error = " , e);
            } finally {
                LOG.warn("Interrupting SendWorker");
                sw.finish();
                if (sock != null) {
                    closeSocket(sock);
                }
            }
        }
    }

Listener

这个类也继承ZooKeeperThread,主要监听electionPort,不断的接收外部连接
run方法核心代码如下

                     while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                        receiveConnection(client);//不断接受连接
                        numRetries = 0;
                    }

内部类小结

SendWorker和RecvWorker互相依赖对方,原因在下面思考中列出
RecvWorker相比SendWorker代码要好理解

两者都有属性sid,表示每个机器和其他机器连接时,按sid区分不同的RecvWorker和SendWorker
好比sid1和其余(n-1)个server建立连接,那么就按sid分开,有(n-1)个RecvWorker和SendWorker

Message作为消息的封装,包含sid和ByteBuffer作为消息体

Listener主要监听本机配置的electionPort,不断的接收外部连接

属性

将重要属性字段整理如下

属性 默认值 备注
RECV_CAPACITY 100 接收队列的长度
SEND_CAPACITY 1 发送队列的长度,原因在"思考"中提到
ConcurrentHashMap<Long, SendWorker> senderWorkerMap; sid对应的SendWorker
ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; 消息发送队列,key为各机器sid
ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; 上一次发送给sid机器的内容
ArrayBlockingQueue<Message> recvQueue 接收队列

比较好理解

函数

请求连接,接收连接相关函数

请求连接相关

connectOne:连接上一个sid的服务器

    synchronized void connectOne(long sid){//连接上某个sid的server
        if (senderWorkerMap.get(sid) == null){//如果没有记录在sender的map里面
            InetSocketAddress electionAddr;
            if (self.quorumPeers.containsKey(sid)) {
                electionAddr = self.quorumPeers.get(sid).electionAddr;//从配置文件获取对应sid机器的选举端口
            } else {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
            try {

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening channel to server " + sid);
                }
                Socket sock = new Socket();
                setSockOpts(sock);
                sock.connect(self.getView().get(sid).electionAddr, cnxTO);//连接上对应socket
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connected to server " + sid);
                }
                initiateConnection(sock, sid);//初始化连接
            } catch (UnresolvedAddressException e) {
                // Sun doesn't include the address that causes this
                // exception to be thrown, also UAE cannot be wrapped cleanly
                // so we log the exception in order to capture this critical
                // detail.
                LOG.warn("Cannot open channel to " + sid
                        + " at election address " + electionAddr, e);
                // Resolve hostname for this server in case the
                // underlying ip address has changed.
                if (self.getView().containsKey(sid)) {
                    self.getView().get(sid).recreateSocketAddresses();
                }
                throw e;
            } catch (IOException e) {
                LOG.warn("Cannot open channel to " + sid
                        + " at election address " + electionAddr,
                        e);
                // We can't really tell if the server is actually down or it failed
                // to connect to the server because the underlying IP address
                // changed. Resolve the hostname again just in case.
                if (self.getView().containsKey(sid)) {
                    self.getView().get(sid).recreateSocketAddresses();
                }
            }
        } else {
            LOG.debug("There is a connection already for server " + sid);
        }
    }

如果senderWorkerMap没有sid对应记录,代表目前没有连接,那么就去连,主要核心代码是调用
initiateConnection函数

initiateConnection:初始化连接

    /**
     * If this server has initiated the connection, then it gives up on the
     * connection if it loses challenge. Otherwise, it keeps the connection.
     */
    public boolean initiateConnection(Socket sock, Long sid) {//初始化连接
        DataOutputStream dout = null;
        try {
            // Sending id and challenge
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(self.getId());//发送本机sid
            dout.flush();
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }
        
        // If lost the challenge, then drop the new connection
        if (sid > self.getId()) {//发送连接的时候,只让大sid给小sid发送,如果当前sid小,那就close掉
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {//自己sid大,初始化SendWorker和RecvWorker
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);//rw记录sw
            sw.setRecv(rw);//sw记录rw

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();//finish掉sid对应的SendWorker,vsw
            
            senderWorkerMap.put(sid, sw);//放入新的SendWorker,sw
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;    
            
        }
        return false;
    }

接收连接相关

receiveConnection:接收外部connect

    public void receiveConnection(Socket sock) {//接收connect
        Long sid = null;
        
        try {
            // Read server id
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                sid = din.readLong();

                // next comes the #bytes in the remainder of the message
                // note that 0 bytes is fine (old servers)
                int num_remaining_bytes = din.readInt();
                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                    LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                    closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];

                // remove the remainder of the message from din
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == QuorumPeer.OBSERVER_ID) {
                /*
                 * Choose identifier at random. We need a value to identify
                 * the connection.
                 */
                
                sid = observerCounter--;
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        } catch (IOException e) {
            closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return;
        }
        
        //If wins the challenge, then close the new connection.
        if (sid < self.getId()) {//如果自己id大,就close掉当前连接(当前是小sid发给大sid的连接),自己再去连对方sid
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return;
        }
    }

这里有个概念,就是win challenge和lose challenge
在zk中,为了保证每一对server只有一个socket,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。
发出连接时,要求自己sid大,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
接收连接时,要求自己sid小,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
在“思考”中也会分析

sender生产,消费相关函数

生成,加入sender队列

    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
          ByteBuffer buffer) {//发送队列长度为1,如果满了就remove,然后add
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            } catch (NoSuchElementException ne) {
                // element could be removed by poll()
                LOG.debug("Trying to remove from an empty " +
                        "Queue. Ignoring exception " + ne);
            }
        }
        try {
            queue.add(buffer);
        } catch (IllegalStateException ie) {
            // This should never happen
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

每个sender的队列长度都是1,为了避免发送旧的数据,因此会把旧的remove掉

消费sender队列

    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
          long timeout, TimeUnit unit) throws InterruptedException {
       return queue.poll(timeout, unit);
    }

recv生产,消费相关函数

recv队列生产

    public void addToRecvQueue(Message msg) {
        synchronized(recvQLock) {
            if (recvQueue.remainingCapacity() == 0) {
                try {
                    recvQueue.remove();
                } catch (NoSuchElementException ne) {
                    // element could be removed by poll()
                     LOG.debug("Trying to remove from an empty " +
                         "recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                recvQueue.add(msg);//加入接收队列
            } catch (IllegalStateException ie) {
                // This should never happen
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

这里并不是很清楚为什么加入队列时,如果满了要把前面的remove掉,队列的长度上限是100

recv队列消费

    public Message pollRecvQueue(long timeout, TimeUnit unit)
       throws InterruptedException {
       return recvQueue.poll(timeout, unit);
    }

其他函数

toSend

将消息根据sid添加进recv队列或者send队列,间接调用send,recv的生产

    public void toSend(Long sid, ByteBuffer b) {
        /*
         * If sending message to myself, then simply enqueue it (loopback).
         */
        if (self.getId() == sid) {//如果发送给自己,加入recv队列
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
            /*
             * Otherwise send to the corresponding thread to send.
             */
        } else {
             /*
              * Start a new connection if doesn't have one already.
              */
             if (!queueSendMap.containsKey(sid)) {//如果发送map没有记录这个sid
                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                         SEND_CAPACITY);//阻塞队列长度为1
                 queueSendMap.put(sid, bq);
                 addToSendQueue(bq, b);

             } else {
                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                 if(bq != null){
                     addToSendQueue(bq, b);
                 } else {
                     LOG.error("No queue for server " + sid);
                 }
             }
             connectOne(sid);//和这个sid建立连接
                
        }
    }

haveDelivered

是否发送过消息

    /**
     * Check if all queues are empty, indicating that all messages have been delivered.
     */
    boolean haveDelivered() {//如果有一个队列是空的,代表发送过了,和注释不一致
        for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
            LOG.debug("Queue size: " + queue.size());
            if (queue.size() == 0) {
                return true;
            }
        }

        return false;
    }

这里吐槽一下,代码和注释不一致,在"思考"中进行讲解

连接所有queueSendMap的sid

    public void connectAll(){//把所有需要发送消息的机器sid都连接上
        long sid;
        for(Enumeration<Long> en = queueSendMap.keys();//连接所有queueSendMap记录的sid
            en.hasMoreElements();){
            sid = en.nextElement();
            connectOne(sid);
        }      
    }

思考

tricky方法的体现:每一对server之间只有一个连接

可以理解成n个server,互相之间都要用connection
好比n个点,用无向的边连起来,用[sidn,sid1]表示sidn向sid1建立了连接
那么,[sid1,sidn]就没有存在的必要了,也就是n*(n+1)/2条边就够了

代码里面规定的是,正常条件下只有sid大的server向sid小的server建立连接
体现在哪

连接请求发出时,如果对方sid比自己大,仅仅发送自己sid也就是一个long过去,然后close掉
QuorumCnxManager#initiateConnection

接收连接请求时,如果对方sid比自己小,那么close掉socket然后自己去连接对方sid
QuorumCnxManager#receiveConnection

也就是说,当sid小的向sid大的server发送连接请求时,也只是在告诉对方
“你sid大,你来连我”

SendWorker以及RecvWorker的初始化
如何保证[sidn,sid1]这样的连接中,双方都有初始化两个worker
在连接请求发出时,sid大的一方,也就是sidn初始化两个worker
在接收连接请求时,sid小的一方,也就是sid1初始化两个worker
代码同样体现在上面两个函数中

为什么RecvWorker和SendWorker要互相记录对方

代码里面就是finish的时候
SendWorker#finish调用对应的RecvWorker#finish
RecvWorker#run的finally段也去调用SendWorker#finish

然后变量senderWorkerMap是final ConcurrentHashMap<Long, SendWorker>类型的,
可以看到并不存在对应RecvWorker的map,
所以原因就是 根据sid找到SendWorker,然后方便调用finish方法

为什么发送队列的长度为1,入队时满了就要把前面的踢出去

长度为1 QuorumCnxManager#SEND_CAPACITY
踢出去 QuorumCnxManager#addToSendQueue

应该参考SEND_CAPACITY注释

// Initialized to 1 to prevent sending
// stale notifications to peers

因为是选举leader投票,有特殊的要求:如果之前的票还没有投出去又产生了新的票,那么旧的票就可以直接作废了,不用真正的投出去

发送队列的生产,消费与发送的调用顺序

生产 
QuorumCnxManager#toSend
QuorumCnxManager#addToSendQueue

消费
QuorumCnxManager.SendWorker#run
QuorumCnxManager#pollSendQueue

发送
QuorumCnxManager.SendWorker#run
SendWorker#send

queueSendMap的意义

final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;//消息发送队列,key为各机器sid

强调一下,key是sid,代表当前机器和对应sid的机器
建立了联系或者有要发送的内容

queueSendMap的改动只有put操作,没有remove等操作
基本都是

    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));

QuorumCnxManager#haveDelivered 注释和代码不一致

源码在之前贴过了
这里的代码和注释不一致,加入生产队列的函数addToSendQueue只有在QuorumCnxManager#toSend中被调用

toSend调用addToSendQueue
之后讲FastLeaderElection#lookForLeader会知道toSend是建立连接的入口,也就是入口方会在queueSendMap中sid对应的发送队列添加一条记录
而haveDelivered 方法中,如果有一个sid对应的队列长度为0,就代表发送队列的 任务 被消费掉了,也就是发送出去了,所以称之为"haveDeliverd",只不过注释有问题

问题

SendWorker发送队列没有东西的时候,把最后一次发送的内容再发一遍

说是这样能解决 ”接收方在消息接收前或者接收到消息后服务器挂了“ 的问题
那么倒数第二条为什么不发,倒数第三条为什么不发???

QuorumCnxManager#addToRecvQueue接收队列满的时候就把最前面的删掉

为什么,漏发了没关系吗

总结

这个类就是网络IO的调度器
SendWorker和RecvWorker作为两个线程不断将消息进行收发
Listener监听外部来的连接
主要属性 queueSendMap记录和哪些sid建立联系,对应的发送队列是什么

refer

http://www.cnblogs.com/leesf456/p/6107600.html 第3部分 QuorumCnxManager:网络I/O
《paoxs到zk》 7.6.3

上一篇下一篇

猜你喜欢

热点阅读