Zookeeper-leader初始化

2019-08-29  本文已影响0人  maxam0128

Zookeeper-leader初始化

在选举完成后,集群每个节点的角色状态就会确定,回到QuorumPeer#start中,每个节点会根据自身的状态完成相应的处理,如下:

本节我们着重分析leader节点相关的逻辑。

Leader 相关的类

Leader节点初始化,主要是根据节点配置初始化上面两个类,在leader 初始化完成之后会新建用于leader-follower之间通信的连接(接受来自follower的连接)。

Leader 控制逻辑

当leader初始化完成之后通过leader#lead实现leader节点的功能,方法出现异常时,将重置当前节点的状态(LOOKING),进入下一轮选举,下面看下lead的执行流程。

1、载入数据

首先从zkDb中恢复数据(在启动阶段已经载入数据,理论上这里不会再次载入),解析出zxid

  1. zkDb中恢复数据(启动阶段已经读入数据,理论上这里不会再次读入)
  2. 解析zxid
  3. clean up dead session
  4. 做 snapshot

2、启动 LearnerCnxAcceptor

LearnerCnxAcceptor 主要作用是接受来自follower的请求,并为每一个follower 连接新建一个LearnerHandler 用于处理、同步follower,可参考6集群数据同步。

3、leader 获取选举周期

这一步稍微有点绕,通过Leader#getEpochToPropos实现,步骤如下:

  1. leader 会调用 Leader#getEpochToPropose 将自身加入到connectingFollowers 这个列表,在没有收到大多数的follower连接上之前,它进入等待状态
  2. 当一个follower连上leader之后,通过LearnerHandler 中调用Leader#getEpochToPropose将follower加入connectingFollowers 列表,并检查是否大多数follower已经连上leader,如大多数已经连上,则唤醒所有在这个节点等待的follower 和leader,否则它自己也在这里进入等待状态
  3. 等待的节点会定时去查看大多数follower都连接到leader

实现核心代码如下,可以看出这个通过wait、notify 实现的等待唤醒策略:

long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
    connectingFollowers.wait(end - cur);
    cur = Time.currentElapsedTime();
}

等待大部分follower都连接到leader之后,根据每个follower发送epoch 计算出一个新的epoch(选出最大的一个epoch并自增)。leader 会根据这个选举周期初始化这一轮zxid。

4、发送 LEADERINFO 数据包给Follower

当大部分的follower连接上leader后,阻塞等待的follower会被唤醒,接下来leader会发送 LEADERINFO(包括newZxid、version等信息)数据包给follower,然后等待大部分 follower 回复 ack 消息。等待策略和上面差不多,通过Leader#waitForEpochAck 实现。

注:如果follower 的版本小于 0x10000 那么就不会发送LEADERINFO 消息给follower,直接阻塞等待follower 回复的ack消息。在ack消息中也包含follower最新的zxid。现在follower除此回复的消息都是0x10000

这一步主要是将上一步选出来的epoch 广播给集群中的其他节点,其他节点后续会根据这个epoch判断收到的数据包是不是新的leader发送的,避免旧的leader复活之后广播之前的提议而造成状态不一致的问题。

5、Leader通过LearnerHandler异步同步数据给Follower

leader 在收到大多数follower回复ack消息之后,开始和follower 同步数据,在集群对外服务之前确保各个节点中的数据一致。通过LearnerHandler#syncFollower数据同步,同步方式有四种全量同步(snap)、差异化(diff)、回滚(trunc)、差异化+回滚(diff+trunc) ,具体采用哪种方式还要看follower 和 leader 之间的数据差异情况。

peerLastZxid:该Learner最后处理的ZXID
minCommittedLog:LeadercommittedLog中的最小ZXID
maxCommittedLog:LeadercommittedLog中的最大ZXID
lastProcessedZxid:leader中最后处理的ZXID

经过上面的步骤,leader 和follower的差异数据同步完成之后,再次检查leader 和follower 在同步数据期间是否有其他的差异数据,所以最后一步就是同步这部分差异数据。

// 对这个方法的理解是,follower在故障恢复时,如果和leader的数据同步完成,还要再次检查同步数据这段时间是不是leader有了新的提交,需要再次同步
leaderLastZxid = leader.startForwarding(this, currentZxid);

在leader 和follower数据同步完成之后,leader 通过LearnerHandler发送 Leader.NEWLEADER 数据包给所有follower,然后leader通过 leader#waitForNewLeaderAck 阻塞等待大多数follower的ack数据。

6、启动LeaderZooKeeperServer

在大多数的follower都回复了newLeaderAck 数据包之后,这是的leader就成为真正的leader了,它启动LeaderZooKeeperServer ,履行leader 的职责,如下:

  1. 创建session追踪器(SessionTrackerImpl)
  2. 启动SessionTracker(它的主要作用是定期清理过期的session)
  3. 设置请求处理器(构造Leader的请求处理链)
  4. 注册JMX,然后将当前的服务的状态设置为RUNNING(运行)

所有的LearnerHandler 会阻塞等待LeaderZooKeeperServer 启动完成,之后它开始给所有follower发送Leader.UPTODATE 数据包,之后便处理来自follower数据。它可以收到如下四类消息:

7、和Follower建立心跳

在Leader 启动完成之后,leader变周期性的发送心跳数据(LearnerHandler#ping)给follower。如果出现follower 掉线或宕机就会将此follower从 learners 中移出,接下来就不会发送propose或者心跳数据给此follower。

当leader上没有大多数的follower时,会将自己关闭,状态重置为LOOKING,进行下一轮的选举。

1、超时时间判断

public synchronized boolean check(long time) {
    if (currentTime == 0) {
        return true;
    } else {
        long msDelay = (time - currentTime) / 1000000;
        return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
    }
}
leader.self.tickTime * leader.self.syncLimit:超时时间

至此,Leader的启动流程就已经完成了。

上一篇 下一篇

猜你喜欢

热点阅读