需要深入研究

Zookeeper的leader选举

2019-06-15  本文已影响102人  tracy_668

Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举。

服务器启动时期的Leader选举
若进行Leader选举,则至少需要两台机器,这里选取3台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器Server1启动时,其单独无法进行和完成Leader选举,当第二台服务器Server2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进入Leader选举过程。选举过程如下

  1. 每个Server发出一个投票: 由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

  2. 接收来自各个服务器的投票: 集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。

  3. 处理投票 针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:

    • 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
    • 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。

对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

  1. 统计投票:每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。
  2. 改变服务器状态: 一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。

还有一种情形是服务器运行时期的Leader选举

在Zookeeper运行期间,Leader与非Leader服务器各司其职,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下

  1. 变更状态。Leader挂后,余下的非Observer服务器都会讲自己的服务器状态变更为LOOKING,然后开始进入Leader选举过程。
  2. 每个Server会发出一个投票。在运行期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123),(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同。
  4. 处理投票。与启动时过程相同,此时,Server1将会成为Leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同。

Leader选举算法分析

在3.4.0后的Zookeeper的版本只保留了TCP版本的FastLeaderElection选举算法。当一台机器进入Leader选举时,当前集群可能会处于以下两种状态

对于集群中已经存在Leader而言,此种情况一般都是某台机器启动得较晚,在其启动之前,集群已经在正常工作,对这种情况,该机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器而言,仅仅需要和Leader机器建立起连接,并进行状态同步即可。而在集群中不存在Leader情况下则会相对复杂,其步骤如下

  1. 第一次投票。无论哪种导致进行Leader选举,集群的所有机器都处于试图选举出一个Leader的状态,即LOOKING状态,LOOKING机器会向所有其他机器发送消息,该消息称为投票。投票中包含了SID(服务器的唯一标识)和ZXID(事务ID),(SID, ZXID)形式来标识一次投票信息。假定Zookeeper由5台机器组成,SID分别为1、2、3、4、5,ZXID分别为9、9、9、8、8,并且此时SID为2的机器是Leader机器,某一时刻,1、2所在机器出现故障,因此集群开始进行Leader选举。在第一次投票时,每台机器都会将自己作为投票对象,于是SID为3、4、5的机器投票情况分别为(3, 9),(4, 8), (5, 8)。

  2. 变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个Leader选举算法的核心所在,其中术语描述如下

vote_sid:接收到的投票中所推举Leader服务器的SID。

vote_zxid:接收到的投票中所推举Leader服务器的ZXID。

 self_sid:当前服务器自己的SID。

self_zxid:当前服务器自己的ZXID。

每次对收到的投票的处理,都是对(vote_sid, vote_zxid)和(self_sid, self_zxid)对比的过程。

 规则一:如果vote_zxid大于self_zxid,就认可当前收到的投票,并再次将该投票发送出去。

 规则二:如果vote_zxid小于self_zxid,那么坚持自己的投票,不做任何变更。

  规则三:如果vote_zxid等于self_zxid,那么就对比两者的SID,如果vote_sid大于self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。

规则四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么坚持自己的投票,不做任何变更。

结合上面规则,给出下面的集群变更过程。


image.png
  1. 确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的SID机器即为Leader。此时Server3将成为Leader。

由上面规则可知,通常那台服务器上的数据越新(ZXID会越大),其成为Leader的可能性越大,也就越能够保证数据的恢复。如果ZXID相同,则SID越大机会越大。

Leader选举实现细节

  1. 服务器状态: 服务器具有四种状态,分别是LOOKING、FOLLOWING、LEADING、OBSERVING。
  LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。
  FOLLOWING:跟随者状态。表明当前服务器角色是Follower。
  LEADING:领导者状态。表明当前服务器角色是Leader。
  OBSERVING:观察者状态。表明当前服务器角色是Observer。
  1. 投票数据结构
      每个投票中包含了两个最基本的信息,所推举服务器的SID和ZXID,投票(Vote)在Zookeeper中包含字段如下
   id:被推举的Leader的SID。

  zxid:被推举的Leader事务ID。

  electionEpoch:逻辑时钟,用来判断多个投票是否在同一轮选举周期中,该值在服务端是一个自增序列,每次进入新一轮的投票后,都会对该值进行加1操作。

  peerEpoch:被推举的Leader的epoch。

  state:当前服务器的状态。
  1. QuorumCnxManager:网络I/O
      每台服务器在启动的过程中,会启动一个QuorumPeerManager,负责各台服务器之间的底层Leader选举过程中的网络通信。
recvQueue:消息接收队列,用于存放那些从其他服务器接收到的消息。

queueSendMap:消息发送队列,用于保存那些待发送的消息,按照SID进行分组。

senderWorkerMap:发送器集合,每个SenderWorker消息发送器,都对应一台远程Zookeeper服务器,负责消息的发送,也按照SID进行分组。

lastMessageSent:最近发送过的消息,为每个SID保留最近发送过的一个消息。
  1. FastLeaderElection:选举算法核心
外部投票:特指其他服务器发来的投票。

内部投票:服务器自身当前的投票。

选举轮次:Zookeeper服务器Leader选举的轮次,即logicalclock。

PK:对内部投票和外部投票进行对比来确定是否需要变更内部投票。
sendqueue:选票发送队列,用于保存待发送的选票。

recvqueue:选票接收队列,用于保存接收到的外部投票。

WorkerReceiver:选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。

WorkerSender:选票发送器,不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中。

算法核心

image.png

上图展示了FastLeaderElection模块是如何与底层网络I/O进行交互的。Leader选举的基本流程如下

    · 外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。

    · 外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理,并返回步骤4。

    · 外部投票的选举轮次等于内部投票。此时可以开始进行选票PK。
    · 若外部投票中推举的Leader服务器的选举轮次大于内部投票,那么需要变更投票。

    · 若选举轮次一致,那么就对比两者的ZXID,若外部投票的ZXID大,那么需要变更投票。

    · 若两者的ZXID一致,那么就对比两者的SID,若外部投票的SID大,那么就需要变更投票。

源码分析

每台服务器在启动的过程中,会启动一个 QuorumPeerManager,负责各台服务器之间的底层 Leader 选举过程中的网络通信。

  1. 初始化 QuorumCnxManager
 protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = createCnxnManager();
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

在 createElectionAlgorithm 会启动 QuorumCnxManager

public QuorumCnxManager(QuorumPeer self) {
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }
    
    this.self = self;

    // Starts listener thread that waits for connection requests 
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}

可以看到QuorumCnxManager 内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照 SID 分组形成队列集合。


recvQueue 消息接收队列,用于存放那些从其他服务器接收到的消息。
queueSendMap 消息发送队列,用于保存那些待发送的消息,按照 SID 进行分组。
senderWorkerMap 发送器集合,每个 SenderWorker 消息发送器,都对应一台远程 Zookeeper 服务器,负责消息的发送,也按照 SID 进行分组。
lastMessageSent 最近发送过的消息,为每个 SID 保留最近发送过的一个消息。

Listener: 可以看到 Listener 初始化了一个 ServerSocket,默认端口为 3888 进行底层 Leader 选举通信。

/**
     * Thread to listen on some port
     */
    public class Listener extends ZooKeeperThread {

        volatile ServerSocket ss = null;

        public Listener() {
            // During startup of thread, thread name will be overridden to
            // specific election address
            super("ListenerThread");
        }

        /**
         * Sleeps on accept().
         */
        @Override
        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    ss.bind(addr);
                    while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());

                        // Receive and handle the connection request
                        // asynchronously if the quorum sasl authentication is
                        // enabled. This is required because sasl server
                        // authentication process may take few seconds to finish,
                        // this may delay next peer connection requests.
                        if (quorumSaslAuthEnabled) {
                            receiveConnectionAsync(client);
                        } else {
                            receiveConnection(client);
                        }

                        numRetries = 0;
                    }
                } catch (IOException e) {
                    LOG.error("Exception while listening", e);
                    numRetries++;
                    try {
                        ss.close();
                        Thread.sleep(1000);
                    } catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. " +
                                  "Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!shutdown) {
                LOG.error("As I'm leaving the listener thread, "
                        + "I won't be able to participate in leader "
                        + "election any longer: "
                        + view.get(QuorumCnxManager.this.mySid).electionAddr);
            }
        }
        

为了避免两台机器之间重复地创建 TCP 连接,Zookeeper 只允许 SID 大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的 SID 值来判断是否接收连接请求,如果当前服务器发现自己的 SID 更大,那么会断开当前连接,然后自己主动和远程服务器建立连接。一旦连接建立,就会根据远程服务器的 SID 来创建相应的消息发送器 SendWorker 和消息接收器 RecvWorker,并启动。每个 RecvWorker 只需要不断地从这个 TCP 连接中读取消息,并将其保存到 recvQueue 队列中。每个 SendWorker 只需要不断地从对应的消息发送队列中获取出一个消息发送即可,同时将这个消息放入 lastMessageSent 中。

 public void receiveConnection(final Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));

            handleConnection(sock, din);
        } catch (IOException e) {
            LOG.error("Exception handling connection, addr: {}, closing server connection",
                     sock.getRemoteSocketAddress());
            closeSocket(sock);
        }
    }


    private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
        Long sid = null;
        try {
            // Read server id
            sid = din.readLong();
            if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                sid = din.readLong();

                // next comes the #bytes in the remainder of the message
                // note that 0 bytes is fine (old servers)
                int num_remaining_bytes = din.readInt();
                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                    LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                    closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];

                // remove the remainder of the message from din
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == QuorumPeer.OBSERVER_ID) {
                /*
                 * Choose identifier at random. We need a value to identify
                 * the connection.
                 */
                sid = observerCounter.getAndDecrement();
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        } catch (IOException e) {
            closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return;
        }

        // do authenticating learner
        LOG.debug("Authenticating learner server.id: {}", sid);
        authServer.authenticate(sock, din);

        //If wins the challenge, then close the new connection.
        if (sid < this.mySid) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return;
        }
    }

FastLeaderElection选举算法

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.stop = false;
        this.manager = manager;
        starter(self, manager);
    }
 private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }
 /**
         * Constructor of class Messenger.
         *
         * @param manager   Connection manager
         */
        Messenger(QuorumCnxManager manager) {
    // 1. WorkerSender 选票接收器,负责从 QuorumCnxManager 接收选票后保存到 recvqueue 中
            this.ws = new WorkerSender(manager);

            Thread t = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
 // 2. WorkerReceiver 选票发送器,负责从 sendqueue 中获取待发送的选票并传递给 QuorumCnxManager
            this.wr = new WorkerReceiver(manager);

            t = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }

在 FastLeaderElection 中有几个属性需要我们重点关注一下:

sendqueue 选票发送队列,用于保存待发送的选票。

recvqueue 选票接收队列,用于保存接收到的外部投票。

WorkerReceiver 选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到 recvqueue 中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。

WorkerSender 选票发送器,不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中。

lookForLeader

    ...
  try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        }
 ...
public Vote lookForLeader() throws InterruptedException {
    // 省略...
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = finalizeWait;

        // 1. 启动时先投自己一票并广播给其它服务器
        synchronized(this){
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        sendNotifications();

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            // 2. 获取其它服务器发送过来的选票
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            // 3. 如果没有选票,则先判断是否存在连接,如存在则是先投自己一票,如没则立即连接
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();
                }
                // 省略...
            } 
            // 4. 收到投票信息,根据 LOOKING、OBSERVING、FOLLOWING、LEADING 分别处理
            else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
                switch (n.state) {
                // 5. LOOKING 时才会进行选举
                case LOOKING:
                    // 5.1 判断投票是否过时,如果自己过时就清除之前已经接收到的信息
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        // 重新发起投票,PK 一下:如果收到的票据大则更新票据,否则仍投自己一票
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            // 更新票据
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            // 仍投自己一票
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    // 5.2 收到的票据过时则直接忽略
                    } else if (n.electionEpoch < logicalclock.get()) {
                        break;
                    // 5.3 epoch 相等则要 PK
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // 5.4 统计谁的投票超过半数,就成为 Leader
                    if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // 5.5 再等一会儿(200ms),看是否有新的投票
                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 5.6 如果没有发生新的投票,则结束选举过程则结束选举,修改状态为 LEADING
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
                            Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                // 6. OBSERVING 不能与投票
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                // 7. FOLLOWING、LEADING 说明已存在 Leader。
                //    可能在同一轮选举中,也可能是之前就存在的 Leader ,则不在同一轮选举中
                case FOLLOWING:
                case LEADING:
                    // 7.1 在同一轮选举中,则收集所有的选票放到 recvset 中
                    //     如有半数支持则更新状态退出选举
                    if(n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        if(termPredicate(recvset, new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                        && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    // 7.2 如果收到的 logicalclock 与当前不相等,那说明在另一个选举中已经有了结果(Leader 已存在)
                    //     收集所有的选票到 outofelection 中,如有半数支持则更新状态退出选举
                    outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                    if (termPredicate(outofelection, new Vote(n.leader,
                            IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                            && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        // 省略...
    }
}

Leader 选举有两个函数需要重点关注一下,totalOrderPredicate() 对两张选票进行 PK,termPredicate() 判断投票是否可以结束了。

totalOrderPredicate(PK 选票)

// id(sid) zxid(事务id) epoch(选举轮数,每更新一次 Leader 自增 1)
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, 
        long curId, long curZxid, long curEpoch) {
    /*
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

termPredicate(结束投票)

/ 票据占多数则结束选举
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 将支持 vote 的票据放到 set 集合中(Set 可去重)
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey());
        }
    }
    
    // self.getQuorumVerifier().containsQuorum(set)
    return voteSet.hasAllQuorums();
}

上一篇下一篇

猜你喜欢

热点阅读