品味Zookeeper之选举及数据一致性

2018-03-09  本文已影响0人  dandan的微笑

品味Zookeeper之选举及数据一致性

本文思维导图

image

前言

为了高可用和数据安全起见,zk集群一般都是由几个节点构成(由n/2+1,投票机制决定,肯定是奇数个节点)。多节点证明它们之间肯定会有数据的通信,同时,为了能够使zk集群对外是透明的,一个整体对外提供服务,那么客户端访问zk服务器的数据肯定是要数据同步,也即数据一致性

zk集群是Leader/Follower模式来保证数据同步的。整个集群同一时刻只能有一个Leader,其他都是Follower或Observer。Leader是通过选举选出来的,这里涉及到ZAB协议(原子消息广播协议)。

1.ZAB协议

1.1 概念理解

为了更好理解下文,先说ZAB协议,它是选举过程和数据写入过程的基石。ZAB的核心是定义会改变zk服务器数据状态的事务请求的处理方式。

ZAB的理解:所有事务请求是由一个全局唯一的服务器来协调处理,这个的服务器就是Leader服务器,
其它服务器都是Follower服务器或Observer服务器。Leader服务器负责将一个客户端的请求转换成那个一个事务Proposalͧ(提议),将该Proposal分发给集群中所有的Follower服务器。然后Leader服务器需要等待所有Follower服务器的应答,当Leader服务器收到超过半数的Follower服务器进行了明确的应答后,Leader会再次向所有的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。

注意事务提议这个词,就类似 人大代表大会提议 ,提议就代表会有应答,之间有通信。因此在zk的ZAB协议为了可靠性和可用性,会有投票应答等操作来保证整个zk集群的正常运行。

总的来说就是,涉及到客户端对zk集群数据改变的行为都先由Leader统一响应,然后再把请求转换为事务转发给其他所有的Follower,Follower应答并处理事务,最后再反馈。如果客户端只是读请求,那么zk集群所有的节点都可以响应这个请求。

1.2 ZAB协议三个阶段

下面会逐点分析,但是在这之前先来了解了解zookeeper服务器的知识吧。

2.Zookeeper服务器

2.1 zk服务器角色

整个zk集群的角色作用如下图:

image

2.2 zk服务器状态

zk服务器的状态是随着机器的变化而变化的。比如Leader宕机了,服务器状态就变为LOOKING,通过选举后,某机器成为Leader,服务器状态就转换为LEADING。其他情况类似。

2.3 zk服务器通信

集群嘛,节点之间肯定是要通信的。zokeeper通信有两个特点:

3.选举机制

3.1 选举算法

从zookeeper开始发布以来,选举的算法也慢慢优化。现在为了可靠性和高可用,从3.4.0版本开始zookeeper只支持基于TcpFastLeaderElection选举协议。

FastLeaderElection选举协议使用TCP实现Leader投票选举算法。它使用了类对象quorumcnxmanager管理连接。该算法是基于推送的,可以通过调节参数来改变选举的过程。第一,finalizewait决定等到决定Leader的时间。这是Leader选举算法的一部分。
final static int finalizeWait = 200;(选举Leader过程的进程时间)
final static int maxNotificationInterval = 60000;(通知检查选中Leader的时间间隔)
final static int IGNOREVALUE = -1;
这里先不详细分析,下面3.5 选举算法源码分析及举栗子才分析。

3.2 何时触发选举

选举Leader不是随时选举的,毕竟选举有产生大量的通信,造成网络IO的消耗。因此下面情况才会出现选举:

3.3 如何成为Leader

3.4 重要的zxid

由3.3知道zxid是判断能否成为Leader的条件之一,它代表服务器的数据版本的新旧程度。

zxid由两部分构成:主进程周期epoch和事务单调递增的计数器。zxid是一个64位的数,高32位代表主进程周期epoch,低32位代表事务单调递增的计数器

主进程周期epoch也叫epoch,是选举的轮次,每选举一次就递增1。事务单调递增的计数器在每次选举完成之后就会从0开始。

如果是比较数据新旧的话,直接比较就可以了。因为如果是主进程周期越大,即高32位越大,那么低32位就不用再看了。如果主进程周期一致,低32位越大,整个zxid就越大。所以直接比较整个64位就可以了,不必高32位于高32位对比,低32位与低32位比较。

3.5 选举算法源码分析及举栗子

3.5.1 举栗子

zookeeper选举有两种情况:

选主原则如下(在选举时,对比次序是从上往下)

同时,在选举的时候是投票方式进行的,除主进程周期外,投票格式为(myid,zxid)。

第一种情况,比较容易理解,下面以3台机器为例子。

第二种情况,已5台机器为例子。

接下来就是对新Leader节点的检查,数据同步,广播,对外提供服务。

3.5.1 选举算法源码分析

选举算法的全部代码在FastLeaderElection类中。其他的lookForLeader函数是选举Leader的入口函数。

//每一轮选举就会增大一次逻辑时钟,同时更新事务
synchronized(this){
    logicalclock++;
    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

//一直循环选举直到找到leader,这里把打印和不相关的都删除了,方便分析。

while ((self.getPeerState() == ServerState.LOOKING) &&
        (!stop)){

    //从通知队列拉取一个投票通知
    Notification n = recvqueue.poll(notTimeout,
            TimeUnit.MILLISECONDS);

    if(n == null){
        //看是否选举时通知发送/接收超时
        int tmpTimeOut = notTimeout*2;
        notTimeout = (tmpTimeOut < maxNotificationInterval?
                tmpTimeOut : maxNotificationInterval);
    }
    else if(self.getVotingView().containsKey(n.sid)) {
        switch (n.state) {
        case LOOKING://只有zk服务器状态为LOOKING时才会进行选举
            // If notification > current, replace and send messages out
            if (n.electionEpoch > logicalclock) {
                //如果选举时的逻辑时钟大于发送通知来源的机器的逻辑时钟,就把对方的修改为自己的。
                logicalclock = n.electionEpoch;
                recvset.clear();
                //并统计票数,如果能成为leader就更新事务
                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                        getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                } else {
                    //否者更新事务为对方的投票信息
                    updateProposal(getInitId(),
                            getInitLastLoggedZxid(),
                            getPeerEpoch());
                }
                sendNotifications();
            } else if (n.electionEpoch < logicalclock) {
                //如果通知来演的机器的逻辑时钟比本次我的选举时钟低,直接返回,什么都不做。因为对方没机会成为leader
                if(LOG.isDebugEnabled()){
                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                            + Long.toHexString(n.electionEpoch)
                            + ", logicalclock=0x" + Long.toHexString(logicalclock));
                }
                break;
            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)) {
                //如果Epoch一样,就看zxid的比较。不过还是会更新事务和回传通知
                updateProposal(n.leader, n.zxid, n.peerEpoch);
                sendNotifications();
            }

            //把所有接收到的投票信息都放到recvset集合
            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

            //统计谁的投票超过半数,就成为leader
            if (termPredicate(recvset,
                    new Vote(proposedLeader, proposedZxid,
                            logicalclock, proposedEpoch))) {

                //验证一下,被选举的leader是否有变化,就是看符不符合
                while((n = recvqueue.poll(finalizeWait,
                        TimeUnit.MILLISECONDS)) != null){
                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)){
                        //符合就放进recvqueue集合
                        recvqueue.put(n);
                        break;
                    }
                }

                //改变选举为leader的机器的状态为LEADING
                if (n == null) {
                    self.setPeerState((proposedLeader == self.getId()) ?
                            ServerState.LEADING: learningState());

                    Vote endVote = new Vote(proposedLeader,
                            proposedZxid, proposedEpoch);
                    leaveInstance(endVote);
                    return endVote;
                }
            }
            break;
        case FOLLOWING:
        case LEADING:
            //在同一轮选举中,判断所有的通知,并确认自己是leader
            if(n.electionEpoch == logicalclock){
                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                if(termPredicate(recvset, new Vote(n.leader,
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                    self.setPeerState((n.leader == self.getId()) ?
                            ServerState.LEADING: learningState());

                    Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                    leaveInstance(endVote);
                    return endVote;
                }
            }
            //在对外提供服务前,先广播一次自己是leader的消息给所有follower,让大家认同我为leader。
            outofelection.put(n.sid, new Vote(n.leader, n.zxid,
                    n.electionEpoch, n.peerEpoch, n.state));
            if (termPredicate(outofelection, new Vote(n.leader,
                    n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                    && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                synchronized(this){
                    logicalclock = n.electionEpoch;
                    self.setPeerState((n.leader == self.getId()) ?
                            ServerState.LEADING: learningState());
                }
                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                leaveInstance(endVote);
                return endVote;
            }
            break;
        }
    } 
}

比较重要的子函数有以下这些:


protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
            Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }
    //按照这样的顺序比较优先:Epoch > Zxid > myid
    return ((newEpoch > curEpoch) || 
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

private boolean termPredicate(
        HashMap<Long, Vote> votes,
        Vote vote) {

    HashSet<Long> set = new HashSet<Long>();
    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }
    return self.getQuorumVerifier().containsQuorum(set);
}

Messenger(QuorumCnxManager manager) {
    this.ws = new WorkerSender(manager);
    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    this.wr = new WorkerReceiver(manager);
    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

其他细节不多说了,主要是sendqueue和recvqueue队列存放待发送投票通知和接收投票通知,WorkerSender和WorkerReceiver两条线程用于投票的通信,QuorumCnxManager manager用于真正和其他机器的tcp连接维护管理,Messenger是整个投票通信的管理者。

3.数据同步机制

3.1 同步准备

完成选举之后,为了数据一致性,需要进行数据同步流程。

3.1.1 Leader准备
3.1.2 Follower准备

3.2 同步初始化

同步初始化涉及到三个东西:minCommittedLog、maxCommittedLog、zxid
– minCommittedLog:最小的事务日志id,即zxid没有被快照存储的日志文件的第一条,每次快照存储
完,会重新生成一个事务日志文件
– maxCommittedLog:事务日志中最大的事务,即zxid

4.数据同步场景

不同的数据同步算法适用不同的场景。

5.广播流程

6.小结

在zookeeper中,除了watcher机制,会话管理,最重要的就是选举了。它是zookeeper集群的核心,也是广泛应用在商业中的前提。洋洋洒洒一大篇,可能存在一些不足,后面更加深入理解再来补充吧。

上一篇下一篇

猜你喜欢

热点阅读