首页投稿(暂停使用,暂停投稿)

【Zookeeper】 Server的启动流程

2017-07-20  本文已影响0人  桥头桥尾

一:前言

当服务通过选举算法进行选举完之后,各个服务器就需要设置自己的角色,并启动相对应的服务(也就是服务的初始化),之后就等待客户端的请求,处理响应的请求。

二:流程分析

2.1、 LEADER

功能:接收客户端的请求, 事务请求的提议者。

首先我们查看启动代码:

    // makeLeader(logFactory) 新建一个Leader实体,打开Leader服务器的交换信息的接口,等待与Learner通信
    setLeader(makeLeader(logFactory));
    // Leader服务启动的主方法
    leader.lead();

Leader服务启动的主方法leader.lead();流程分析:

  void lead() throws IOException, InterruptedException {
       ...
            //1> 加载FileTxnSnapLog中的数据, 并把每一个事务数据封装成一个Proposal,放入committedLog
            //   中,并计算minCommittedLog, maxCommittedLog, 数据放入ZKDatabase
            //2> 处理事务数据时,若事务Type为Session的数据,响应的增删到sessionsWithTimeouts中
            //3> 设置Leader的highestZxid
            //4> 通过ZKDatabase中的sessions与sessionsWithTimeouts进行比较,Kill失效的session, 并将
            //     失效相关联的临时节点进行删除
            zk.loadData();</br>

            // 开启接收Leaner的连接线程, 并把每一个Leaner的连接封装成一个LearnerHandler实体, 添加到
            // Leader的learners列表中,每一个LearnerHandler开启一个线程处理响应的Leaner的信息
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();</br>
            
            readyToStart = true;
            // 获取当前选举的轮次, 同步等待法定人数的Leaner注册身份(FOLLOWERINFO/OBSERVERINFO)
            // 到Leader 超时时间为  initLimit \* tickTime
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            // 根据当前的轮次,初始化zxid
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));</br>
            ...</br>
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);</br>
            ...</br>
            //Leaner向Leaner发送LEADERINFO消息,并等待法定人数的Leaner的响应epoch的消息
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);
            try {
                //等待法定人数的Leaner初始化同步数据响应的消息,超时时间为 initLimit \* tickTime
                waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
            } catch (InterruptedException e) {
                ...
            }
            
            //开启对客户端的服务的主方法
            startZkServer();
            
            //是否有zxid的初始化设置
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }
           
           //对每一个Leaner发起ping检测消息, 检测的时间间隔为tickTime / 2
           // 检测learnerType = LearnerType.PARTICIPANT的人数是否少于法定的人数,
           // 如果少于,则   shutDown服务,进行新一轮的选举
            boolean tickSkip = true;
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick++;
                }
                HashSet<Long> syncedSet = new HashSet<Long>();
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }

              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    return;
              } 
              tickSkip = !tickSkip;
            }
           //...
    }

下面我们队上面代码的两个流程进行流程分析①: Leaner向Leader的注册同步流程;②:Leader与客户端服务的流程。

2.1.1、 Leaner向Leader的注册同步流程

Leader异步等待法定的客户端注册同步:Leader的Lead()方法中:

// 异步等待Leaner信息的注册
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 异步等待Leaner接收到Leader的信息的EpochAck
waitForEpochAck(self.getId(), leaderStateSummary);
// 异步等待Leander对Leader的NEWLEADER消息的Ack
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);

2.1.2、 Leader与客户端服务的流程

zk处理客户端的请求都是通过Proccessor进行链路处理。对于LeaderZooKeeperServer对应的Proccessor的处理关系为: PrepRequestProcessor ——> ProposalRequestProcessor ——> CommitProcessor ——> Leader.ToBeAppliedRequestProcessor ——>FinalRequestProcessor, 同时ProposalRequestProcessor 将事务请求(request.hdr != null)交给同步刷盘处理器处理 ProposalRequestProcessor ——> SyncRequestProcessor ——> AckRequestProcessor。AckRequestProcessor处理器将刷盘成功的请求交给Leader作为一个提议,作为Leader判断提议成功的法定人数, 成立交给CommitProcessor

图片.png

2.2、 FOLLOWER

功能:接收客户端的请求, 事务请求提议的参与者,将事务请求转发给Leader。

首先查看启动代码

     // makeFollower(logFactory)新建一个Follwer实体,建立与Leader通信
    setFollower(makeFollower(logFactory));
    // follower启动的主方法
    follower.followLeader();

Follower服务启动的主方法follower.followLeader();流程分析:

void followLeader() throws InterruptedException {
       ...
        try {
            InetSocketAddress addr = findLeader();            
            try {
                //与Leader建立通信
                connectToLeader(addr);
                //将自己的信息注册到Leader
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);</br>

                //初始化同步Leader的数据
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);</br>

                //接收Leader通信数据,并做响应的业务处理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
               ...
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }

2.2.1 Follwer注册与同步流程分析

2.2.1 Follwer与客户端的服务流程

对于角色Follwer的处理客户端的请求是通过下面的RequestProcessor进行处理: FollowerRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor; 同时将其转发给Leader是并把请求,Leader会进行提议请求,将接收到的提议请求交给SyncRequestProcessor ——> SendAckRequestProcessor, SendAckRequestProcessor将ack消息发送给Leader,作为Leader判断提议成功的法定人数,成立交给CommitProcessor 。

图片.png

2.3、 OBSERVER

功能:接收客户端的请求, 将事务请求转发给Leader。

首先查看启动代码

// makeObserver(logFactory)新建一个Observer实体,建立与Leader通信
setObserver(makeObserver(logFactory));
// observer启动的主方法
observer.observeLeader();

Observer服务启动的主方法observer.observeLeader();流程分析:

void observeLeader() throws InterruptedException {
     ...
            try {
                //建立与Leader的连接
                connectToLeader(addr);
                //将自身的信息注册到Leader中, 返回Leader的lastLoggerZxid
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                //同步Leader的数据信息
                syncWithLeader(newLeaderZxid);
                 //接收Leader通信数据,并做响应的业务处理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);                   
                }
            } catch (Exception e) {
               ...
            }
       ...
    }

3.3.1 Observer注册与同步流程分析

3.3.2 Observer与客户端的服务流程

对于Observer的处理客户端的请求是通过下面的RequestProcessor进行处理:ObserverRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor ; 同时对于Leader的Leader.INFORM消息会同时交给SyncRequestProcessor(刷盘操作)跟CommitProcessor。

图片.png
上一篇下一篇

猜你喜欢

热点阅读