EtcdRaft源码分析(日志复制)

2019-03-14  本文已影响0人  Pillar_Zhong
1552532882423.png

Leader

假定现在已经选出leader,开始要准备给其他人做日志同步了。

首先你要成为一个真正的Leader,需要做前期准备。

  • 从Candidate转变为Leader,不是只是换个名字而已
  • 转变完成后,就要给其他成员同步日志了

becomeLeader

func (r *raft) becomeLeader() {
   // TODO(xiangli) remove the panic when the raft implementation is stable
   if r.state == StateFollower {
      panic("invalid transition [follower -> leader]")
   }
   r.step = stepLeader
   r.reset(r.Term)
   r.tick = r.tickHeartbeat
   r.lead = r.id
   r.state = StateLeader
   // Followers enter replicate mode when they've been successfully probed
   // (perhaps after having received a snapshot as a result). The leader is
   // trivially in this state. Note that r.reset() has initialized this
   // progress with the last index already.
   r.prs[r.id].becomeReplicate()

   // Conservatively set the pendingConfIndex to the last index in the
   // log. There may or may not be a pending config change, but it's
   // safe to delay any future proposals until we commit all our
   // pending log entries, and scanning the entire tail of the log
   // could be expensive.
   r.pendingConfIndex = r.raftLog.lastIndex()

   emptyEnt := pb.Entry{Data: nil}
   if !r.appendEntry(emptyEnt) {
      // This won't happen because we just called reset() above.
      r.logger.Panic("empty entry was dropped")
   }
   // As a special case, don't count the initial empty entry towards the
   // uncommitted log quota. This is because we want to preserve the
   // behavior of allowing one entry larger than quota if the current
   // usage is zero.
   r.reduceUncommittedSize([]pb.Entry{emptyEnt})
   r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
  • 首先之前的身份不能是follower
  • 之后step处理会让stepLeader托管
  • 将自己设为ProgressStateReplicate,且Next=Match+1

reset(r.Term)

func (r *raft) reset(term uint64) {
   if r.Term != term {
      r.Term = term
      r.Vote = None
   }
   r.lead = None

   r.electionElapsed = 0
   r.heartbeatElapsed = 0
   r.resetRandomizedElectionTimeout()

   r.abortLeaderTransfer()

   r.votes = make(map[uint64]bool)
   r.forEachProgress(func(id uint64, pr *Progress) {
      *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
      if id == r.id {
         pr.Match = r.raftLog.lastIndex()
      }
   })

   r.pendingConfIndex = 0
   r.uncommittedSize = 0
   r.readOnly = newReadOnly(r.readOnly.option)
}
  • 设置任期为当前任期
  • 投票,lead,选举计时器,心跳计时器,随机选举超时时间,leader转移,投票机,pendingConfigIndex,未提交的entrySize,readOnly全部清零
    • pendingConfigIndex
    • readOnly
    • uncommittedSize
  • 重置本地保存的其他节点的进度
    • 这里需要注意的是,将对方的Next设为跟leader保持一致,是leader假定大家都跟我一致。r.raftLog.lastIndex() + 1
    • 每个节点的Progress的状态初始都为Probe

tickHeartbeat

r.heartbeatElapsed++
r.electionElapsed++

if r.electionElapsed >= r.electionTimeout {
   r.electionElapsed = 0
   if r.checkQuorum {
      r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
   }
   // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
   if r.state == StateLeader && r.leadTransferee != None {
      r.abortLeaderTransfer()
   }
}

if r.state != StateLeader {
   return
}

if r.heartbeatElapsed >= r.heartbeatTimeout {
   r.heartbeatElapsed = 0
   r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
  • 如果成员接受Leader的同步请求的情况

  • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1

    • ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
    • ProgressStateReplicate
      • 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

bcastAppend

func (r *raft) maybeSendAppend (to uint64, sendIfEmpty bool) bool {
   pr := r.getProgress(to)
   if pr.IsPaused() {
      return false
   }
   m := pb.Message{}
   m.To = to

   term, errt := r.raftLog.term(pr.Next - 1)
   ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
   if len(ents) == 0 && !sendIfEmpty {
      return false
   }

   if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
      if !pr.RecentActive {
         r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
         return false
      }

      m.Type = pb.MsgSnap
      snapshot, err := r.raftLog.snapshot()
      if err != nil {
         if err == ErrSnapshotTemporarilyUnavailable {
            r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
            return false
         }
         panic(err) // TODO(bdarnell)
      }
      if IsEmptySnap(snapshot) {
         panic("need non-empty snapshot")
      }
      m.Snapshot = snapshot
      sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
      r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
         r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
      pr.becomeSnapshot(sindex)
      r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
   } else {
      m.Type = pb.MsgApp
      m.Index = pr.Next - 1
      m.LogTerm = term
      m.Entries = ents
      m.Commit = r.raftLog.committed
      if n := len(m.Entries); n != 0 {
         switch pr.State {
         // optimistically increase the next when in ProgressStateReplicate
         case ProgressStateReplicate:
            last := m.Entries[n-1].Index
            pr.optimisticUpdate(last)
            pr.ins.add(last)
         case ProgressStateProbe:
            pr.pause()
         default:
            r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
         }
      }
   }
   r.send(m)
   return true
}
  • 拿到对方的Progress,也就是进度。
  • 打包当前节点Next之后的entries
  • 打包当前节点Next-1的(任期,index),作为接收人校验用
  • 将自己committed的情况发给对方
  • 准备发MsgApp消息给对方
  • 遍历entries
    • 如果对方的状态是ProgressStateReplicate
      • 更新对方进度的Next为最新的last
      • 将last加到ins里面,注意这个ins是个类环形的队列。
  • Snapshot的情况
    • 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
    • 拿出当前节点存储的snapshot,有可能在unstable或storage里面
    • 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
    • 准备发MsgSnap消息给对方

Follower

case pb.MsgApp:
   r.electionElapsed = 0
   r.lead = m.From
   r.handleAppendEntries(m)
  • 首先Follower认为只有Leader才能发这种消息,所以只要收到就认他为Leader
  • 同时选举计时要清零
  • 真正处理的逻辑在handleAppendEntries里面

handleAppendEntries

func (r *raft) handleAppendEntries(m pb.Message) {
   if m.Index < r.raftLog.committed {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
      return
   }

   if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
   } else {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
   }
}

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            offset := index + 1
            l.append(ents[ci-offset:]...)
        }
        l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false
}
  • 如果比Follower已经committed还要小,他会把自己committed的情况发回给Leader,没关系,将自己committed的情况发回给Leader

  • 在mayAppend的时候会去比较leader发来的index的(任期,index)是否一致。如果不一致给Leader报告你给的index位置的entry任期跟我对不上。有可能我根本都没有,有可能是完全不一样的东西。你的同步请求我拒绝,并附上我现在的最后一位。RejectHint: r.raftLog.lastIndex(),然后冲突点就是发来的index。

    • 报告中的最后一位的作用,待分析

    • Raft中只要某个位置的(任期,index)一致,那么index之前都是一致的。

  • 如果能对上,说明插入位置前一位我们都一致,这样可以放心往后append了。

    • 首先我们算出append之后新的最后一位,lastnewi
    • findConflict
      • 当然了,最好的情况是正好能接上,也就不存在冲突的可能性,无脑往后append新的entry就好了
      • 还有的情况是,follower本地存储的entry比leader想象的还要多,还要复杂。那怎么办,当然是从前往后找到第一个冲突点,然后之后的全部不要,跟leader保持一致。
    • 然后跟Leader要求的committed保持一致
    • 然后给Leader报告说,你要求的我都执行完了,附上我现在最新的last位置
  • 另外还有一种情况是,Leader的探测请求,Follower

Leader

下面我们剖析下Leader在收到成员的同步响应之后的处理。

case pb.MsgAppResp:
   pr.RecentActive = true

   if m.Reject {
      r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
         r.id, m.RejectHint, m.From, m.Index)
      if pr.maybeDecrTo(m.Index, m.RejectHint) {
         r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
         if pr.State == ProgressStateReplicate {
            pr.becomeProbe()
         }
         r.sendAppend(m.From)
      }
   } else {
      oldPaused := pr.IsPaused()
      if pr.maybeUpdate(m.Index) {
         switch {
         case pr.State == ProgressStateProbe:
            pr.becomeReplicate()
         case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
            r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
            // Transition back to replicating state via probing state
            // (which takes the snapshot into account). If we didn't
            // move to replicating state, that would only happen with
            // the next round of appends (but there may not be a next
            // round for a while, exposing an inconsistent RaftStatus).
            pr.becomeProbe()
            pr.becomeReplicate()
         case pr.State == ProgressStateReplicate:
            pr.ins.freeTo(m.Index)
         }

         if r.maybeCommit() {
            r.bcastAppend()
         } else if oldPaused {
            // If we were paused before, this node may be missing the
            // latest commit index, so send it.
            r.sendAppend(m.From)
         }
         // We've updated flow control information above, which may
         // allow us to send multiple (size-limited) in-flight messages
         // at once (such as when transitioning from probe to
         // replicate, or when freeTo() covers multiple messages). If
         // we have more entries to send, send as many messages as we
         // can (without sending empty messages for the commit index)
         for r.maybeSendAppend(m.From, false) {
         }
         // Transfer leadership is in progress.
         if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
            r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
            r.sendTimeoutNow(m.From)
         }
      }
   }

agree

  • 如果成员接受Leader的同步请求的情况

  • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1

    • ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
    • ProgressStateReplicate 待分析
      • 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

maybeUpdate

func (pr *Progress) maybeUpdate(n uint64) bool {
   var updated bool
   if pr.Match < n {
      pr.Match = n
      updated = true
      pr.resume()
   }
   if pr.Next < n+1 {
      pr.Next = n + 1
   }
   return updated
}
  • maybeUpdate,从上面分析就知道,没有拒绝就说明,大家在某种程度是一致的,对方发来的index就表示leader发给他的数据同步到哪里了。首先第一件事情,就是记录下来对方同步的进度。

maybeCommit

func (r *raft) maybeCommit() bool {
   // Preserving matchBuf across calls is an optimization
   // used to avoid allocating a new slice on each call.
   if cap(r.matchBuf) < len(r.prs) {
      r.matchBuf = make(uint64Slice, len(r.prs))
   }
   mis := r.matchBuf[:len(r.prs)]
   idx := 0
   for _, p := range r.prs {
      mis[idx] = p.Match
      idx++
   }
   sort.Sort(mis)
   mci := mis[len(mis)-r.quorum()]
   return r.raftLog.maybeCommit(mci, r.Term)
}
  • maybeCommit, 这里会统计各个成员的进度,如果超过一半的人的同步进度Match已经超过了Leader的committed位置,这个时候Leader才可以安心去commit本地entry了。
  • 最后将commit的变更再次发给成员去同步

reject

如果被对方拒绝

if m.Reject {
   r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
      r.id, m.RejectHint, m.From, m.Index)
   if pr.maybeDecrTo(m.Index, m.RejectHint) {
      r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
      if pr.State == ProgressStateReplicate {
         pr.becomeProbe()
      }
      r.sendAppend(m.From)
   }
}

maybeDecrTo

func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
   if pr.State == ProgressStateReplicate {
      // the rejection must be stale if the progress has matched and "rejected"
      // is smaller than "match".
      if rejected <= pr.Match {
         return false
      }
      // directly decrease next to match + 1
      pr.Next = pr.Match + 1
      return true
   }

   // the rejection must be stale if "rejected" does not match next - 1
   if pr.Next-1 != rejected {
      return false
   }

   if pr.Next = min(rejected, last+1); pr.Next < 1 {
      pr.Next = 1
   }
   pr.resume()
   return true
}
  • 如果对方进度的状态是ProgressStateReplicate,如果冲突点居然比Match要小,感觉不可思议,直接忽略。
    • 否则的话,直接跳到Match+1的地方作为进度的Next,相当于Match之后的全部丢掉,准备重新开始同步。简单直接粗暴。
  • 一般来说pr.Next-1是应该等于rejected的,想想看rejected是插入位置的前一位,专门用来校验用的,而pr.Next-1不也是插入位置的前一位么?所以如果不相等,感觉不可思议,直接忽略。
  • 将对方进度的Next回退到rejectted,其实就相当于Next回退一位,为什么这么做,其实就是在探测啦,回退一位,发给Follower看看是不是还是冲突,不行,回来,再回退一位,如此往复。总会找到相同的时候
  • 如果maybeDecrTo能够成功回退,但还不确定回退的位置,对方能接受,这个时候如果对方是ProgressStateReplicate状态,那么先转为ProgressStateProbe。
  • 好了,该回退的也回退了,将最新的entries按回退的位置再发给对方看看。
  • 可以看到ProgressStateReplicate会直接回退到Match+1, 去试试看,如果还被拒绝,那么会转成ProgressStateProbe,而ProgressStateProbe只会每次回退一位,去试试看。

总结

到这里,整个流程还可以往下继续在Leader和Follower之间来回往复,但是,大体的逻辑就是这样,可以说算法非常精妙。希望你能看懂我在说什么。

上一篇下一篇

猜你喜欢

热点阅读