etcd leader选举过程(草稿)

2020-08-05  本文已影响0人  酱油王0901

To be continued...


状态转换图

Leader竞选

Follower节点或者Candidate节点在选举超时时间(例如,leader离线)到了之后会发起新的选举。

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
    r.electionElapsed++

    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

发送的消息为:

m := pb.Message{}
m.From = r.id
m.Type = pb.MsgHup

raft状态机的Step方法在处理Message时会根据是否设置perVote,从而发起PreElectionElection

switch m.Type {
    case pb.MsgHup:
        if r.preVote {
            r.hup(campaignPreElection)
        } else {
            r.hup(campaignElection)
        }
        ....

这时,raft状态机会判断当前是否为leader,是则直接忽略此message。如果不是leader,则判断是否能竞选leader。如果满足以下几个条件之一就不能竞选leader。

如果上述列举的都不满足则可以竞选leader。

func (r *raft) hup(t CampaignType) {
    if r.state == StateLeader {
        r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        return
    }

    if !r.promotable() {
        r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
        return
    }
    ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
    if err != nil {
        r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
    }
    if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
        r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
        return
    }

    r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
    r.campaign(t)
}

根据CampaignTypecampaignPreElectioncampaignElection,当前节点状态机进入becomePreCandidate或者becomeCandidate

  1. 如果是进入Candidate状态:
  1. 如果是进入PreCandidate状态:
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
    if !r.promotable() {
        // This path should not be hit (callers are supposed to check), but
        // better safe than sorry.
        r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
    }
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        // 只是增加了消息的term值,并未增加raft的term值。
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    // 单节点集群检测
    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
        // We won the election after voting for ourselves (which must mean that
        // this is a single-node cluster). Advance to the next state.
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    var ids []uint64
    {
        idMap := r.prs.Voters.IDs()
        ids = make([]uint64, 0, len(idMap))
        for id := range idMap {
            ids = append(ids, id)
        }
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
    }
    // 获取集群中能投票节点ID,并发送指定类型的消息
    for _, id := range ids {
        if id == r.id {
            continue
        }
        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        // Index和LogTerm为raft log中的最后一条entry的Index值和Term值
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

向集群中的节点发送指定类型的消息。

m := pb.Message{}
m.Term= r.Term + 1 // term+1还没有应用到状态机
m.To = id
m.Type = pb.MsgPreVote
m.Index  = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
m.Context = ctx
// ----------------------
m := pb.Message{}
m.Term= r.Term // 需要注意的是,term值其实已经加1了,因为已经应用在状态机上了。
m.To = id
m.Type = pb.MsgVote
m.Index  = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
m.Context = ctx

接着调用raft.send()将消息追加到raft.msg队列中,等待上层模块将其发送出去。
streamReader接收到消息之后进行反序列化处理,然后根据消息类型发送到streamReaderrecvc通道或propc通道。实际上写入的是peerrecvc通道或propc通道,不过在这里是直接写入recvc通道。

// file: etcdserver/api/rafthttp/stream.go
for {
        m, err := dec.decode()
        if err != nil {
            cr.mu.Lock()
            cr.close()
            cr.mu.Unlock()
            return err
        }

        // gofail-go: var raftDropHeartbeat struct{}
        // continue labelRaftDropHeartbeat
        receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))

        cr.mu.Lock()
        paused := cr.paused
        cr.mu.Unlock()

        if paused {
            continue
        }

        if isLinkHeartbeatMessage(&m) {
            // raft is not interested in link layer
            // heartbeat message, so we should ignore
            // it.
            continue
        }

        recvc := cr.recvc
        if m.Type == raftpb.MsgProp {
            recvc = cr.propc
        }

        select {
        case recvc <- m:
        default:

peer接收到message之后,会调用raft.Process()方法进行处理。

// file: etcdserver/api/rafthttp/peer.go
go func() {
        for {
            select {
            case mm := <-p.recvc:
                if err := r.Process(ctx, mm); err != nil {
                    if t.Logger != nil {
                        t.Logger.Warn("failed to process Raft message", zap.Error(err))
                    }
                }
            case <-p.stopc:
                return
            }
        }
    }()

    // r.Process might block for processing proposal when there is no leader.
    // Thus propc must be put into a separate routine with recvc to avoid blocking
    // processing other raft messages.
    go func() {
        for {
            select {
            case mm := <-p.propc:
                if err := r.Process(ctx, mm); err != nil {
                    if t.Logger != nil {
                        t.Logger.Warn("failed to process Raft message", zap.Error(err))
                    }
                }
            case <-p.stopc:
                return
            }
        }
    }()

raft.Process()方法首先检查发送消息的节点是否被移除,如果没有被移除则调用raft.Step()方法进行处理。

// file: etcdserver/server.go
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    lg := s.getLogger()
    if s.cluster.IsIDRemoved(types.ID(m.From)) {
        lg.Warn(
            "rejected Raft message from removed member",
            zap.String("local-member-id", s.ID().String()),
            zap.String("removed-member-id", types.ID(m.From).String()),
        )
        return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
    }
    if m.Type == raftpb.MsgApp {
        s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
    }
    return s.r.Step(ctx, m)
}

接着调用node.step()方法。其会将Message写入node的recvc通道。

func (n *node) step(ctx context.Context, m pb.Message) error {
    return n.stepWithWaitOption(ctx, m, false)
}

recvc通道接收到消息之后会将其应用在状态机中。

case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
                r.Step(m)
            }

当收到消息的Term大于当前Term,且消息类型为MsgPreVoteMsgVote,会先根据message中是否带有campaignTransfer的context,以此决定是否强制当前节点参与选举。接着会判断是否开启CheckQuorum模式,当前节点是否有已知的的lead节点,以及选取计时器是否超时。如果不是强制要求此节点参与选举过程,同时节点在lease内,则直接返回。
同时需要注意的是,由于当前节点的term值小于消息的term值,因此会根据消息的类型做相应的处理。

  1. 如果消息类型为MsgPreVote,则当前节点的状态机不做处理。
  2. 如果消息类型为MsgVote,则当前节点becomeFollower,并且用消息的term值重置状态机。但是当前节点的lead为空。
  3. 如果是leader发送过来的MsgApp, MsgHearbeat以及MsgASnap消息,则用消息的term值重置状态机,并将lead置为发送消息的节点。
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
            force := bytes.Equal(m.Context, []byte(campaignTransfer))
            inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
            if !force && inLease {
                // If a server receives a RequestVote request within the minimum election timeout
                // of hearing from a current leader, it does not update its term or grant its vote
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
                return nil
            }
        }
        switch {
        case m.Type == pb.MsgPreVote:
            // Never change our term in response to a PreVote
        case m.Type == pb.MsgPreVoteResp && !m.Reject:
            // We send pre-vote requests with a term in our future. If the
            // pre-vote is granted, we will increment our term when we get a
            // quorum. If it is not, the term comes from the node that
            // rejected our vote so we should become a follower at the new
            // term.
        default:
            r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                r.id, r.Term, m.Type, m.From, m.Term)
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                r.becomeFollower(m.Term, None)
            }
        }

当前节点在参与选举时,会考虑如下情况来决定是否参与投票。只有满足下面1,2,3中的一个,且同时满足条件4才能将票投给发送消息的节点。

  1. 当前节点是否已经投过票,而且投的正是发送消息的节点。
  2. 当前节点还没有投票,而且lead为空。
  3. 消息的type为PreVote,而且消息的Term值大于当前节点状态机的Term值。
  4. 发送消息的节点的raftLog是否包含当前节点的所有entries信息。
case pb.MsgVote, pb.MsgPreVote:
        // We can vote if this is a repeat of a vote we've already cast...
        canVote := r.Vote == m.From ||
            // ...we haven't voted and we don't think there's a leader yet in this term...
            (r.Vote == None && r.lead == None) ||
            // ...or this is a PreVote for a future term...
            (m.Type == pb.MsgPreVote && m.Term > r.Term)
        // ...and we believe the candidate is up to date.
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            // Note: it turns out that that learners must be allowed to cast votes.
            // This seems counter- intuitive but is necessary in the situation in which
            // a learner has been promoted (i.e. is now a voter) but has not learned
            // about this yet.
            // For example, consider a group in which id=1 is a learner and id=2 and
            // id=3 are voters. A configuration change promoting 1 can be committed on
            // the quorum `{2,3}` without the config change being appended to the
            // learner's log. If the leader (say 2) fails, there are de facto two
            // voters remaining. Only 3 can win an election (due to its log containing
            // all committed entries), but to do so it will need 1 to vote. But 1
            // considers itself a learner and will continue to do so until 3 has
            // stepped up as leader, replicates the conf change to 1, and 1 applies it.
            // Ultimately, by receiving a request to vote, the learner realizes that
            // the candidate believes it to be a voter, and that it should act
            // accordingly. The candidate's config may be stale, too; but in that case
            // it won't win the election, at least in the absence of the bug discussed
            // in:
            // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            // When responding to Msg{Pre,}Vote messages we include the term
            // from the message, not the local term. To see why, consider the
            // case where a single node was previously partitioned away and
            // it's local term is now out of date. If we include the local term
            // (recall that for pre-votes we don't update the local term), the
            // (pre-)campaigning node on the other end will proceed to ignore
            // the message (it ignores all out of date messages).
            // The term in the original message and current local term are the
            // same in the case of regular votes, but different for pre-votes.
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                // Only record real votes.
                r.electionElapsed = 0
                r.Vote = m.From
            }

此时,会发送一条响应信息。如果消息类型为pb.MsgVote,则会将选取超时重置,并将票投给消息发送节点。

m := pb.Message{}
m.Term= m.Term
m.To = m.From
m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp

但是,如果是拒绝投票的话,响应消息为:

m := pb.Message{}
m.Term= m.Term
m.To = m.From
m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp
m.Reject = true // 拒绝投票

发送消息的节点,即PreCandidate/Candidate节点收到集群中其他节点返回的MsgPreVoteResp或者MsgVoteResp消息,也会调用Step方法进行处理。由于响应消息的term值与节点的Term相等(如果是PreVote,则term都没有加1;如果是Vote,则term值都已经加1),因此会直接调用stepCandidate进行消息处理。节点根据响应消息进行投票统计,然后根据投票结果以及自身状态机的状态来进行相应的处理。

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
    // Only handle vote responses corresponding to our candidacy (while in
    // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
    // our pre-candidate state).
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
    case pb.MsgApp:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:
        // 统计投票结果
        gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
        switch res {
        // 如果统计投票结果获得大多数的投票
        case quorum.VoteWon:
            if r.state == StatePreCandidate {
                // 如果状态机的状态是preCandidate,则开始竞选leader
                r.campaign(campaignElection)
            } else {
                // 如果状态机的状态是Candidate则直接成为leader并向集群中其他节点广播MsgApp消息
                r.becomeLeader()
                r.bcastAppend()
            }
        case quorum.VoteLost:
            // pb.MsgPreVoteResp contains future term of pre-candidate
            // m.Term > r.Term; reuse r.Term
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}

如果赢得选举投票,则根据当前状态则决定下一步如何处理。
(1) 如果状态为PreCandidate,则开始竞选leader,调用campaign(campaignElection)
(2) 如果状态为其他,即Candidate,则竞选leader成功,并向集群中其他节点广播MsgApp消息。

func (r *raft) becomeLeader() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateFollower {
        panic("invalid transition [follower -> leader]")
    }
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader
    // Followers enter replicate mode when they've been successfully probed
    // (perhaps after having received a snapshot as a result). The leader is
    // trivially in this state. Note that r.reset() has initialized this
    // progress with the last index already.
    r.prs.Progress[r.id].BecomeReplicate()

    // Conservatively set the pendingConfIndex to the last index in the
    // log. There may or may not be a pending config change, but it's
    // safe to delay any future proposals until we commit all our
    // pending log entries, and scanning the entire tail of the log
    // could be expensive.
    r.pendingConfIndex = r.raftLog.lastIndex()

    emptyEnt := pb.Entry{Data: nil}
    if !r.appendEntry(emptyEnt) {
        // This won't happen because we just called reset() above.
        r.logger.Panic("empty entry was dropped")
    }
    // As a special case, don't count the initial empty entry towards the
    // uncommitted log quota. This is because we want to preserve the
    // behavior of allowing one entry larger than quota if the current
    // usage is zero.
    r.reduceUncommittedSize([]pb.Entry{emptyEnt})
    r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

同步Log

Leader节点会向其他节点发送同步log的消息。


// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
    r.maybeSendAppend(to, true)
}

在向peers发送Append信息时,首先会判断Progress是否暂停,leader节点有所有peers的Progress信息,并根据进度的信息以及状态来判断是否暂停。被跟踪的follower节点有三种状态类型:

// StateType is the state of a tracked follower.
type StateType uint64

const (
    // StateProbe indicates a follower whose last index isn't known. Such a
    // follower is "probed" (i.e. an append sent periodically) to narrow down
    // its last index. In the ideal (and common) case, only one round of probing
    // is necessary as the follower will react with a hint. Followers that are
    // probed over extended periods of time are often offline.
    StateProbe StateType = iota
    // StateReplicate is the state steady in which a follower eagerly receives
    // log entries to append to its log.
    StateReplicate
    // StateSnapshot indicates a follower that needs log entries not available
    // from the leader's Raft log. Such a follower needs a full snapshot to
    // return to StateReplicate.
    StateSnapshot
)

如果没有被暂停,则尝试去获取需要发送到peer的term和entries,如果获取失败,则说明需要发送快照信息pb.MsgSnap,因为有可能entries已经被压缩,因此会尝试去发送快照信息,同时Progress进入StateSnapshot状态。如果成功获取到term和entries,则发送pb.MsgApp信息。

m := pb.Message{}
m.To = to
m.Type = pb.MsgSnap
m.Snapshot = snapshot

或者

m := pb.Message{}
m.To = to
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
    pr := r.prs.Progress[to]
    if pr.IsPaused() {
        return false
    }
    m := pb.Message{}
    m.To = to

    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {
        return false
    }

    if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
        if !pr.RecentActive {
            r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
            return false
        }

        m.Type = pb.MsgSnap
        snapshot, err := r.raftLog.snapshot()
        if err != nil {
            if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
            }
            panic(err) // TODO(bdarnell)
        }
        if IsEmptySnap(snapshot) {
            panic("need non-empty snapshot")
        }
        m.Snapshot = snapshot
        sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
        r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
            r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
        pr.BecomeSnapshot(sindex)
        r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
    } else {
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in StateReplicate
            case tracker.StateReplicate:
                last := m.Entries[n-1].Index
                pr.OptimisticUpdate(last)
                pr.Inflights.Add(last)
            case tracker.StateProbe:
                pr.ProbeSent = true
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    }
    r.send(m)
    return true
}

问题点

  1. 在统计计票赢得leader之后会去调用r.becomeLeaderr.bcastAppend,如果超过quorum个节点回复响应消息,那是否会被调用多次?
  2. Leader节点append一条空的entry记录的作用是啥?

References

上一篇下一篇

猜你喜欢

热点阅读