JavaIT技术篇

深入理解ZooKeeper

2020-04-25  本文已影响0人  挪威的senlin

 什么是ZooKeeper?ZooKeeper是一个分布式、开源的分布式应用协作服务。它提供了一组简单的原语,分布式应用程序可以在这些原语的基础上实现更高级别的服务,用于同步、配置维护、组命名,提供简单易用的接口给用户使用。
 本文将介绍ZooKeeper的基本概念、使用场景、ZAB协议和请求处理。

ZooKeeper的概念和基础

1. 服务架构

image.png
 ZooKeeper本身是一个高可用的服务,ZooKeeper集群中服务器有三种角色leader、follower、observer,leader提供读和写的服务,follower只提供读的服务和参与leader选举,observer和follower的区别是observer不参与选举。

2. 数据结构

image.png
 ZooKeeper的数据结构类似于一个文件系统。存放一个个的数据节点(Znode),Znode的属性有永久(persistent)、临时(ephemeral)和有序(sequential)。

3. 监控与通知
 Watcher是Zookeeper中很重要的机制。客户端通过对znode创建watcher当节点发生变化的时候(节点删除、数据更改、子节点变化等),ZooKeeper将会通知注册Watcher的客户端节点已经变更。
 监听事件有推和拉的形式,所谓推就是事件触发之后服务器向客户端推送数据,而拉就是客户端轮询服务器检查事件是否触发。而ZooKeeper采用推和拉结合的形式,事件触发之后,服务器给客户端推送事件(不包含事件的内容,只有发生了什么事件),客户端收到通知之后去服务器拉去最新的数据,采用这种方式每次每次通知只需要传输少量数据就行了,减少I/O压力。需要注意的是ZooKeeper在事件通知之后会将Watcher给删除,为了继续监听,客户端必须在每次通知后设置一个新的Watcher。

4. 会话(Session)
 在对ZooKeeper集合执行任何请求前,一个客户端必须先与服务建立会话。客户端与服务器将会建立一个TCP的长连接,第一次建立连接的时候也是Session开始的时候,客户端与服务器通过这个连接发送心跳监控彼此存活的状态。客户端可以设置会话超时时间sessionTimeout,在集群模式下,客户端和ZooKeeper服务器断开连接之后,只要间隔时间不超过sessionTimeout之前建立的会话依然有效。

5. 应用
 因为ZooKeeper自身的分布式一致性和特殊的数据结构,可以使用ZooKeeper解决很多分布式系统的问题,比如数据的发布订阅、分布式锁、Master选举、分布式协调等功能。

. ZooKeeper内部原理

ZAB

 要理解ZooKeeper就必须要先理解ZAB,Zookeeper Atomic Broadcast(Zookeeper原子广播协议)。Zab协议包括两个模式:恢复(recovery)和广播(broadcast)。当服务启动或者在Leader故障以后,Zab过渡到恢复模式。在一个Leader出现并且有多数服务器与它进行同步后,恢复模式结束转为广播模式。同步包括保证Leader和新的服务器保持一致的状态。
ZAB协议保证了几个原则:

  1. 可靠交付(Reliable delivery):如果一个消息m在一台服务器上被交付,那么它最终将在所有正确的服务器上被交付。
  2. 完全有序(Total order):如果一个消息a在消息b之前被一台服务器交付,那么所有服务器都交付了a和b,并且a先于b。
  3. 因果有序(Causal order):如果消息a在因果上先于消息b并且二者都被交付,那么a必须排在b之前。

 下面将详细介绍ZAB协议在ZooKeeper服务器在各种状态之下的作用。介绍之前首先要知道几个概念:

  1. ZXID:也就是事务id, 为了保证事务的顺序一致性,zookeeper 采用了递增的事务 id 号(zxid)来标识事务。为一个long型(64位)整数,分为两部分:轮次(epoch)部分和计数器(counter)部分。每个部分为32位。
  2. 服务状态:
    LOOKING:当服务器启动和与leader失联之后,服务器会进入LOOKING状态,目的是为了查找或者选举Leader。
    FOLLOWING:follower角色,处理客户端的读请求,并参与leader选举。
    LEADING:leader角色,处理客户端的读写请求。
    OBSERVING:observer角色,处理客户端的读请求,不参与leader选举。

选举

 服务启动处于LOOKING开始选举一个新的leader或查找已经存在的leader,如果leader已经存在,其他服务器就会通知这个新启动的服务器,告知哪个服务器是leader,与此同时,新的服务器会与leader建立连接,以确保自己的状态与leader一致。 如果集群中所有的服务器均处于LOOKING状态,这些服务器之间就会进行ZAB协议选举一个leader,通过信息交换对leader选举达成共识的选择。在本次选举过程中胜出的服务器将进入LEADING状态,而集群中其他服务器将会进入FOLLOWING状态。
 Leader选举流程如下:

  1. 服务器向集群中发送一个投票信息vote,包含自己的server id和最后一条已经提交事务的id,第一票先投自己。
  2. 接受到其他服务器的投票,比较是否修改自己的投票信息。用myZxid和mySid表示自身的投票信息,voteZxid和voteServerId为接受到的投票信息如果(voteZxid>myZxid)或(voteZxid=myZxid且voteServerId>myServerId),将自己的投票信息修改为接受到的投票信息。选取最大的zxid是因为选取出的leader需要有相比其他服务器有最全的数据。
  3. 有服务器超过过半投票,选举它为Leader,选举结束。


    leader选举流程

 以下为选举投票部分的源码分析
服务启动

            //主线程
            while (running) {
                switch (getPeerState()) {
                //当前服务处于looking状态
                case LOOKING:
                    LOG.info("LOOKING");
                    ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
                    //省略无关代码
                    ......
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                //开始leader选举算法、默认FastLeaderElection
                                startLeaderElection();
                            }
                            //选举leader
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    }
                    break; 
                    case OBSERVING:
                    ......
                    break;
                    case FOLLOWING:
                    ......
                    break;
                    case LEADING:
                    ......
                    break;

leader选举流程


    public Vote lookForLeader() throws InterruptedException {
       ......
        try {
            
            //存放当前leader的票数
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            //存放之前几届的leader选举结果、以及本次领导人选举的投票结果
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                logicalclock.incrementAndGet();
                //先投自己为leader
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            //发送投票、包括自己
            sendNotifications();

            SyncedLearnerTracker voteSet;

            //交换通知,直到选出leader
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {

                //从接受队列中获取投票
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if (n == null) {
                    //所有消息都已传递、继续投递直到选出leader
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                    //消息还在,可能其他server还没启动,尝试连接  
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    //延迟超时时间
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:

                        // If notification > current, replace and send messages out
                        // 接受的投票大于当前的逻辑时钟、表示开启新的一轮的选举,需要清空当前投票结果、重新投票
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            //比较是否需要更改自己投票信息 依次比较epoch、zxid、server id
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            //发送投票
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                        // 接受的投票小于当前的逻辑时钟、不处理
                               
                            break;
                            
                        } 
                        //判断是否需要更改投票信息
                        else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }


                        // don't care about the version if it's in LOOKING state
                        //存放此次投票结果
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                        //选举是否结束、默认超过一半的sever同意
                        if (voteSet.hasAllQuorums()) {
                            ......

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                //更改服务状态 LEADING 或者 learningState (FOLLOWING , OBSERVING)
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                //返回投票结果
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
          
            return null;
    }

同步

当leader选举完之后,Learner(follower、observer)需要同步leader的数据到本地。同步的类型有四种形式,详细介绍同步规则前,先了解几个变量,首先ZKDatabase维护了一个committedLog(已经提交事务的队列缓存,用于Leader快速同步数据给Learner)
peerLastZxid : Learner最后处理的zxid;
maxCommittedLog :committedLog中已经提交的最大zxid;
minCommittedLog :committedLog中最小zxid日志;

当集群中种超过一半的服务器数据同步完成,集群就恢复了对外提供服务的能力。

广播

所有服务器都可以在本地执行读请求,而写请求则需要转交给Leader处理。当Leader收到一个写请求之后,处理过程如下图类似一个经典的二阶段提交,Leader提出一个请求,收集投票,最后提交。


请求处理
  1. Leader将请求转换成一个Propose,为其分配一个单调递增的Id(zxid),保证请求的有序性。
  2. Leader和每个Follower之前维护了一个FIFO的队列进行消息的通讯,以此来保证通讯的有序性。Leader将Propose投递给每个Follower的FIFO队列,Follower接受到
    Propose之后将其写入磁盘,然后回应Leader一个ACK消息,通知Leader已经接受到提案。
  3. 当Leader接受到大部分服务器的回应(ACK)之后,它会广播一条提交(COMMIT)指令然后在本地提交消息。当Follower从Leader处收到提交(COMMIT)指令时也提交消息。

 这里介绍了ZAB协议在zookeeper服务从选举到同步到广播整个流程的作用,下面用一个流程图详细理一下zookeeper服务的状态zab协议的状态流转。


zookeeper状态扭转

请求处理

 Leader、Follower和Observer根本上都是服务器。我们在实现服务器时使
用的主要抽象概念是请求处理器。请求处理器是对处理流水线上不同阶段的抽象。每一个服务器实现了一个请求处理器的序列。我们可以把一个处理器想象成添加到请求处理的一个元素。一条请求经过服务器流水线上所有处理器的处理后被称为得到完全处理。

请求处理器
 ZooKeeper代码里有一个叫RequestProcessor的接口。这个接口的主要方法是processRequest,它接受一个Request参数。在一条请求处理器的流水线上,对相邻处理器的请求的处理通常通过队列现实解耦合。当一个处理器有一条请求需要下一个处理器进行处理时,它将这条请求加入队列。然后,它将处于等待状态直到下一个处理器处理完此消息。

Leader

Leade请求处理流水线
  1. PrepRequestProcessor:接受客户端的请求并执行这个请求,处理结果则是生成一个事务。
  2. ProposalRequestProcessor:准备一个提议,并将该提议发送给跟随者ProposalRequestProcessor将会把所有请求都转发CommitRequestProcessor,而且,对于写操作请求,还会将请求转发给SyncRequestProcessor处理器。
  3. SyncRequestProcessor:负责将事务持久化到磁盘上。实际上就是将事务数据按顺序追加到事务日志中,并生成快照数据。
  4. AckRequestProcessor:一个简单请求处理器,它仅仅生成确认消息并返回给自己。
  5. CommitRequestProcessor:会将收到足够多的确认消息的提议进行提交。
  6. ToBeAppliedRequestProcessor:这个处理器会从提议列表中删除那些待接受的提议在FinalRequestProcessor处理器执行后删除这个列表中的元素。
  7. FinalRequestProcessor:处理更新类型的请求,并执行读取请求。

Follower

Follower请求处理流水线
  1. FollowerRequestProcessor :转发请求给CommitRequestProcessor,同时也会转发写请求到群首服务器。
  2. CommitRequestProcessor:直接转发读取请求到FinalRequestProcessor处理器,而且对于写请求,为了保证执行的顺序,CommitRequestProcessor处理器会在收到一个写请求处理器时暂停后续的请求处理,等待leader提交事务的消息。
  3. SyncRequestProcessor:当leader接收到写请求时会将提案发送给每个follower,当收到一个提案,follower会发送这个提议到SyncRequestProcessor处理器。
  4. SendRequestProcessor:会向群首发送确认消息。

小结

 平时工作中基本都是面向数据库的CRUD,拥有很丰富的搬砖经验,其实代码的质量和水平并不高。如果要实现一个虽然功能很简单,但是要求可用性和拓展性很高组件,这时候感觉自己的水平就不够用了。所以我们为什么要去看一些优秀的项目,也不是说每一段代码都要掌握的很详细,而是学习其中的思想。比如zookeeper其中服务状态的转换、服务之间的通信、FIFO队列、ZAB协议的实现、对于请求处理器一个个的抽象、数据结构的设计等等,这些都是可以从中学习到的思路。将来自己遇到类似的场景,想一想别人是怎么实现的,自己脑子里才有思路,实现功能很简单,最要的是如何实现的可维护、可拓展。当然zookeeper还有很多需要学习地方,本文只是对它做一个简单的介绍和一些核心思想的实现进行说明,大家想深入了ZooKeeper还是需要参考其他资料。

参考

http://zookeeper.apache.org/
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》
《ZooKeeper:分布式过程协同技术详解》
《Zab: A simple totally ordered broadcast protocol》

上一篇下一篇

猜你喜欢

热点阅读