zookeeper的原理-leader选举

2020-05-15  本文已影响0人  剑道_7ffc

Zookeeper 的一致性

Zookeeper 的来源

通过算法来解决分布式环境下多个节点中选举出一个leader。

Zookeeper 的一致性是什么情况?

根据zab协议的同步流程,数据同步是采用过半提交策略,意味着它是最终一致性,而不是强一致性。
zookeeper 是一个顺序一致性模型。

什么是顺序一致性呢

下图是弱一致性


image.png

下图是更强的一直性保证,若如果 B1 得到的 x 的值为 1,那么 C1 看到的值也一定是 1


image.png
顺序一致性是针对单个操作,单个数据对象。属于 CAP 中 C这个范畴。一个数据被更新后,能够立马被后续的读操作读到。
zookeeper 的顺序一致性实现是缩水版,zookeeper可以在执行读取操作前调用sync来解决弱一致性的问题(客户端Az执行了更新操作,客户端B可能读取是旧值,也可以是新值);zookeeper 基于 zxid 以及阻塞队列的方式来实现请求的顺序一致性,若客户端刚开始从最新的follower读取,从而获取了zxid,若又向旧的follower连接,则连接失败,原因是客户端的zxid大于服务端的zxid。因此zookeeper保证的是时间轴的一致性。

Single System Image

client 只要连接过一次 zookeeper,就不会有历史的倒退。

leader 选举的原理

leader选举的两个阶段,一是服务器启动时的leader选举,二是运行过程中leader宕机导致的leader选举。

重要的参数

服务器 ID(myId)

编号越大在选举算法的权重越大。

事务id(zxid)

值越大说明数据越新,在选举算法的权重越大。

逻辑时钟(epoch – logicalclock)

投票的次数,每投完一次票,则加1

选举状态

LOOKING,竞选状态。
FOLLOWING,随从状态,同步 leader 状态,参与投票。
OBSERVING,观察状态,同步 leader 状态,不参与投票。
LEADING,领导者状态。

服务器启动时的 leader 选举

1 每个服务器处于LOOKING,发起一个投票(epoch,ZXID,myId),并把投票发送到其他机器。
2 接受其他机器的投票,并检验有效性如检查是否是本轮投票等
3 处理投票:将别人的投票和本机器的投票进行比较,比较顺序是epoch,ZXID和myid,大的是leader。
4 统计投票:判断是否已有过半的相同的投票信息
5 改变服务器状态:一旦确定leader则更新自己的服务器状态,leader更新为LEADING,follow更新为FOLLOWING。u若没有则重新发起投票。

运行过程中的 leader 选举

1 变更状态:将剩余非Observer服务器的状态变为LOOKING.
2 剩余的其他流程和启动时一样。

leader 选举的源码分析

入口

通过server.sh的来启动QuorumPeerMain的main方法

选举逻辑

正常启动的流程图
image.png
核心源码

1 org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader

public Vote lookForLeader() throws InterruptedException {
    try {
        //接收到的票据的集合
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

        //
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this){
            //逻辑时钟->epoch
            logicalclock.incrementAndGet();
            //proposal
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();//我要广播自己的票据

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        //接收到了票据
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            //recvqueue是从网络上接收到的其他机器的Notification
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();//重新连接集群中的所有节点
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }

            else if(validVoter(n.sid) && validVoter(n.leader)) {//判断是否是一个有效的票据
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view for a replica in the voting view.
                 */
                switch (n.state) {
                case LOOKING: //第一次进入到这个case
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) { //
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();//清空
                        //收到票据之后,当前的server要听谁的。
                        //可能是听server1的、也可能是听server2,也可能是听server3
                        //zab  leader选举算法
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            //把自己的票据更新成对方的票据,那么下一次,发送的票据就是新的票据
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            //收到的票据小于当前的节点的票据,下一次发送票据,仍然发送自己的
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        //继续发送通知
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) { //说明当前的数据已经过期了
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    //决断时刻(当前节点的更新后的vote信息,和recvset集合中的票据进行归纳,)
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock.get(),
                                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
        }
        return null;
    }
}

2 org.apache.zookeeper.server.quorum.FastLeaderElection#totalOrderPredicate

    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }
        
        /*
         *
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        
        return ((newEpoch > curEpoch) || 
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

3 org.apache.zookeeper.server.quorum.FastLeaderElection#sendNotifications

    private void sendNotifications() {
        for (QuorumServer server : self.getVotingView().values()) {
            long sid = server.id;

            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader, //myid
                    proposedZxid, //zxid
                    logicalclock.get(),//epoch
                    QuorumPeer.ServerState.LOOKING,//
                    sid, //myid
                    proposedEpoch); //发起票据epoch
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
            sendqueue.offer(notmsg); //阻塞队列,  线程->生产者消费者模式
        }
    }

4 org.apache.zookeeper.server.quorum.FastLeaderElection#termPredicate

    protected boolean termPredicate(
            HashMap<Long, Vote> votes,
            Vote vote) {

        HashSet<Long> set = new HashSet<Long>();

        /*
         * First make the views consistent. Sometimes peers will have
         * different zxids for a server depending on timing.
         *
         */
        for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())){ //对票据进行归纳
                set.add(entry.getKey()); //如果存在2票,set里面是不是有2个?
            }
        }

        return self.getQuorumVerifier().containsQuorum(set); //验证
    }

5 org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum

    public boolean containsQuorum(Set<Long> set){
        return (set.size() > half); //已经归纳的票据是否大于half .2>1  -> leader选举、 数据同步
    }

消息传输

通信流程图
image.png
image.png

选举完成的处理逻辑

FOLLOWING

1 流程图


image.png

2 核心代码方法
org.apache.zookeeper.server.quorum.Follower#followLeader

LEADING

1 核心代码方法
org.apache.zookeeper.server.quorum.Leader#lead

上一篇下一篇

猜你喜欢

热点阅读