zookeeper的原理-leader选举
Zookeeper 的一致性
Zookeeper 的来源
通过算法来解决分布式环境下多个节点中选举出一个leader。
Zookeeper 的一致性是什么情况?
根据zab协议的同步流程,数据同步是采用过半提交策略,意味着它是最终一致性,而不是强一致性。
zookeeper 是一个顺序一致性模型。
什么是顺序一致性呢
下图是弱一致性
image.png
下图是更强的一直性保证,若如果 B1 得到的 x 的值为 1,那么 C1 看到的值也一定是 1
image.png
顺序一致性是针对单个操作,单个数据对象。属于 CAP 中 C这个范畴。一个数据被更新后,能够立马被后续的读操作读到。
zookeeper 的顺序一致性实现是缩水版,zookeeper可以在执行读取操作前调用sync来解决弱一致性的问题(客户端Az执行了更新操作,客户端B可能读取是旧值,也可以是新值);zookeeper 基于 zxid 以及阻塞队列的方式来实现请求的顺序一致性,若客户端刚开始从最新的follower读取,从而获取了zxid,若又向旧的follower连接,则连接失败,原因是客户端的zxid大于服务端的zxid。因此zookeeper保证的是时间轴的一致性。
Single System Image
client 只要连接过一次 zookeeper,就不会有历史的倒退。
leader 选举的原理
leader选举的两个阶段,一是服务器启动时的leader选举,二是运行过程中leader宕机导致的leader选举。
重要的参数
服务器 ID(myId)
编号越大在选举算法的权重越大。
事务id(zxid)
值越大说明数据越新,在选举算法的权重越大。
逻辑时钟(epoch – logicalclock)
投票的次数,每投完一次票,则加1
选举状态
LOOKING,竞选状态。
FOLLOWING,随从状态,同步 leader 状态,参与投票。
OBSERVING,观察状态,同步 leader 状态,不参与投票。
LEADING,领导者状态。
服务器启动时的 leader 选举
1 每个服务器处于LOOKING,发起一个投票(epoch,ZXID,myId),并把投票发送到其他机器。
2 接受其他机器的投票,并检验有效性如检查是否是本轮投票等
3 处理投票:将别人的投票和本机器的投票进行比较,比较顺序是epoch,ZXID和myid,大的是leader。
4 统计投票:判断是否已有过半的相同的投票信息
5 改变服务器状态:一旦确定leader则更新自己的服务器状态,leader更新为LEADING,follow更新为FOLLOWING。u若没有则重新发起投票。
运行过程中的 leader 选举
1 变更状态:将剩余非Observer服务器的状态变为LOOKING.
2 剩余的其他流程和启动时一样。
leader 选举的源码分析
入口
通过server.sh的来启动QuorumPeerMain的main方法
选举逻辑
正常启动的流程图
image.png核心源码
1 org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
//接收到的票据的集合
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
//
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//逻辑时钟->epoch
logicalclock.incrementAndGet();
//proposal
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();//我要广播自己的票据
/*
* Loop in which we exchange notifications until we find a leader
*/
//接收到了票据
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//recvqueue是从网络上接收到的其他机器的Notification
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();//重新连接集群中的所有节点
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(validVoter(n.sid) && validVoter(n.leader)) {//判断是否是一个有效的票据
/*
* Only proceed if the vote comes from a replica in the
* voting view for a replica in the voting view.
*/
switch (n.state) {
case LOOKING: //第一次进入到这个case
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) { //
logicalclock.set(n.electionEpoch);
recvset.clear();//清空
//收到票据之后,当前的server要听谁的。
//可能是听server1的、也可能是听server2,也可能是听server3
//zab leader选举算法
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//把自己的票据更新成对方的票据,那么下一次,发送的票据就是新的票据
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//收到的票据小于当前的节点的票据,下一次发送票据,仍然发送自己的
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//继续发送通知
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) { //说明当前的数据已经过期了
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//决断时刻(当前节点的更新后的vote信息,和recvset集合中的票据进行归纳,)
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
}
return null;
}
}
2 org.apache.zookeeper.server.quorum.FastLeaderElection#totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
3 org.apache.zookeeper.server.quorum.FastLeaderElection#sendNotifications
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader, //myid
proposedZxid, //zxid
logicalclock.get(),//epoch
QuorumPeer.ServerState.LOOKING,//
sid, //myid
proposedEpoch); //发起票据epoch
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
sendqueue.offer(notmsg); //阻塞队列, 线程->生产者消费者模式
}
}
4 org.apache.zookeeper.server.quorum.FastLeaderElection#termPredicate
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
/*
* First make the views consistent. Sometimes peers will have
* different zxids for a server depending on timing.
*
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){ //对票据进行归纳
set.add(entry.getKey()); //如果存在2票,set里面是不是有2个?
}
}
return self.getQuorumVerifier().containsQuorum(set); //验证
}
5 org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
public boolean containsQuorum(Set<Long> set){
return (set.size() > half); //已经归纳的票据是否大于half .2>1 -> leader选举、 数据同步
}
消息传输
通信流程图
image.pngimage.png
选举完成的处理逻辑
FOLLOWING
1 流程图
image.png
2 核心代码方法
org.apache.zookeeper.server.quorum.Follower#followLeader
LEADING
1 核心代码方法
org.apache.zookeeper.server.quorum.Leader#lead