Raft在etcd中的实现(四)日志复制与执行

2018-04-11  本文已影响463人  yuan1028

流程

  1. 客户端向raft集群发送消息。若当前节点为follower,则将消息转发给leader。
  2. leader节点发送AppendEntries给follower。消息包含
term                               //leader当前的term值
leaderId                           //follower在收到client request时,可以用该值转发给leader
prevLogIndex                       //上一条日志条目的索引
prevLogTerm                        //上一条日志条目的term
entries[]                          //日志条目,对于心跳包则该值为空,日志条目可以为多条
leaderCommit                       //leader服务器的commitIndex
  1. follower收到leader发送的MsgApp消息,对比自身日志条目判断是否需要append自身条目,是否需要更新自身的值和状态。follower返回的消息
Results:
term                          //当前任期
success                       //具体的判断如下
Receiver implementation:
//任期值比当前任期小,则该RPC已失效,或当前leader已变更
1. Reply false if term < currentTerm 
//不包含匹配prevLogTerm的prevLogIndex所对应的条目,通常该情况为节点挂掉一段时间,落后leader节点
//leader会重新发包含较早的prevLogTerm及prevLogIndex的RPC给该节点
2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm 
// 以下均返回true
// 若日志条目已有内容与entries里的内容冲突,则删除已有及其后的条目
3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it 
// 将新的日志条目追加到日志中
4. Append any new entries not already in the log
//如果leaderCommit比自身commitIndex大,则更新自身的commitIndex为min(leaderCommit,当前最新日志条目索引)
5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
  1. leader收到半数以上机器的正确回复,则可以认为该条目有效。将该条目交由state machine处理,将执行结果返回给客户端。leader节点在下次心跳等AppendEntries RPCs中,会记录可被提交的日志条目编号commitIndex。(所以follower节点对这条日志条目的执行是在下次leader发心跳包或者Entries,更新commitIndex的时候)

代码详解

  1. 客户端发送request给raft集群。对request的基本处理由业务控制,大概就是进行些检查和类型转化后send给raftNode的proposeC或者confChangeC。业务层调用node.Propose来处理一般请求,调用node.ProposeConfChangeC来处理集群配置变更请求。这里我们重点关注一般请求。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
    //...
    go func() {
        var confChangeCount uint64 = 0
        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
              //一般的业务请求
            case prop, ok := <-rc.proposeC:
                if !ok {
                    rc.proposeC = nil
                } else {
                    // blocks until accepted by raft state machine
                    rc.node.Propose(context.TODO(), []byte(prop))
                }
            //集群配置信息变更请求
            case cc, ok := <-rc.confChangeC:
                if !ok {
                    rc.confChangeC = nil
                } else {
                    confChangeCount += 1
                    cc.ID = confChangeCount
                    rc.node.ProposeConfChange(context.TODO(), cc)
                }
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()

经由中间node层的几步操作(这里只是消息转一下),之后调用raft的Step方法。

// github.com/coreos/etcd/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }
    select {
    case ch <- m:
        return nil
    //...
    }
}
func (n *node) run(r *raft) {
    //...
    for {
        //...
        //如果当前有leader,则会处理该条客户端请求,如果无leader,则不会处理当前的这条propc
        if lead != r.lead {
            if r.hasLeader() {
                if lead == None {
                    r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
                } else {
                    r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
                }
                propc = n.propc
            } else {
                r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
                propc = nil
            }
            lead = r.lead
        }

        select {
        // ...
        case m := <-propc:
            m.From = r.id
            r.Step(m)
    
        }
    }
}
  1. Step函数中会根据当前节点的状态调用相应的step处理方法。最终leader向follower节点发送AppendEntries RPC。
// github.com/coreos/etcd/raft/raft.go
func (r *raft) Step(m pb.Message) error {
     //...
    switch m.Type {
    default:
        err := r.step(r, m)
        if err != nil {
            return err
        }
    }
    return nil
}

如果当前的节点是leader,则leader发送appendEntry RPCs给其他节点。

// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
    // These message types do not require any progress for m.From.
    switch m.Type {
    //...
    case pb.MsgProp:
        //...
        r.appendEntry(m.Entries...)
        r.bcastAppend()
        return nil
    //...
    }
    return nil
}

如果当前节点是candidate,candidate节点不处理MsgProp消息,直接返回错误。

// github.com/coreos/etcd/raft/raft.go
func stepCandidate(r *raft, m pb.Message) error {
    //...
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
          }
}

如果当前节点是follower,follower会将消息转发给leader。这个时候就会在leader那边重新走1、2流程。

// github.com/coreos/etcd/raft/raft.go
func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    case pb.MsgProp:
        if r.lead == None {
            r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
            return ErrProposalDropped
        } else if r.disableProposalForwarding {
            r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
            return ErrProposalDropped
        }
        m.To = r.lead
        r.send(m)
    }
}

发送的消息的内容:

Type:    pb.MsgApp              //消息类型为MsgApp
Index:   pr.Next - 1            //Index值为当前Next-1,事实上就是自己最后一条日志条目的index
LogTerm: term                   //当前term 
Entries: ents                   //日志条目
Commit:  r.raftLog.committed    //leader发的leaderCommitted,告知follower现在到committed的日志条目可以确认提交
  1. follower节点收到leader节点的appendEntries RPCs后进行处理。
// github.com/coreos/etcd/raft/raft.go
// 收到MsgApp后的处理,若当前节点自己是leader,收到MsgApp。若比自己当前term高,则自身转变为follower。然后按照follower的处理。
//candidate状态收到MsgApp后,转变为follower。调用handleAppendEntries处理
// follower状态收到MsgApp后,调用handleAppendEntries处理。
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
       //...
       //收到比自己当前Term高的MsgApp,说明集群中有新leader了,则转变为becomeFollower
    case m.Term > r.Term:
        //...
        switch {
        //...
        default:
            r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                r.id, r.Term, m.Type, m.From, m.Term)
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                r.becomeFollower(m.Term, None)
            }
        }

    case m.Term < r.Term:
          //若收到比当前term小的MsgApp,直接回复
        if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
            //
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
        } 
           //...
        return nil
    }
}

func stepCandidate(r *raft, m pb.Message) error {
      //...
    switch m.Type {
    //...
    case pb.MsgApp:
        //candidate状态下收到MsgApp说明已经选出leader,自己转变为follower
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
      }
}
func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    //...
    case pb.MsgApp:
        // election计数器重置,将当前lead置为消息来源者
        r.electionElapsed = 0
        r.lead = m.From
        r.handleAppendEntries(m)
        }
}

handleAppendEntries

// github.com/coreos/etcd/raft/raft.go
//判断是否需要append
func (r *raft) handleAppendEntries(m pb.Message) {
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }

    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
            r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

maybeAppend,判断并更新节点自身的日志条目

// github.com/coreos/etcd/raft/log.go
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
         //findConflict是逐条比较日志条目的ents里term和index的值。找到最早不一致的点
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            offset := index + 1
            l.append(ents[ci-offset:]...)
        }
        l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false
}
  1. leader收到MsgAppResp,更新follower对应的Next值和Match值。
// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
    //...
    switch m.Type {
      //...
    case pb.MsgAppResp:
        pr.RecentActive = true

        if m.Reject {
                        //拒绝说明follower可能漏掉了很多日志条目,需要更新leader存储的对应的该节点的Next值,然后重新向follower发AppendEntries
            r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
                r.id, m.RejectHint, m.From, m.Index)
                    //调整该follower的Next值为Match + 1,然后重新发AppendEntries
            if pr.maybeDecrTo(m.Index, m.RejectHint) {
                r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                if pr.State == ProgressStateReplicate {
                    pr.becomeProbe()
                }
                r.sendAppend(m.From)
            }
        } else {
            oldPaused := pr.IsPaused()
               //maybeUpdate是更新Match值
            if pr.maybeUpdate(m.Index) {
                switch {
                 //更新状态
                case pr.State == ProgressStateProbe:
                    pr.becomeReplicate()
                case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
                    r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                    pr.becomeProbe()
                case pr.State == ProgressStateReplicate:
                    pr.ins.freeTo(m.Index)
                }
                //判断是否可提交,这边是统计维护的节点的Match值,进行个排序,
                //然后拿中间的值即可(即可以证明多数值都比该值大)
                //
                if r.maybeCommit() {
                    r.bcastAppend()
                } else if oldPaused {
                    // update() reset the wait state on this node. If we had delayed sending
                    // an update before, send it now.
                    r.sendAppend(m.From)
                }
                // Transfer leadership is in progress.
                if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                    r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                    r.sendTimeoutNow(m.From)
                }
            }
        }
    }
    return nil
}
  1. committed值更新的时候,表示当前有新的日志可以提交给state machine来执行了。(这边交给state machine的过程实际上是通过发消息中带过去的,见 Raft在etcd中的实现(二)节点发送消息和接收消息流程,其中在发送消息时会判断是否有可以执行的日志条目)
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
    /*其他逻辑*/
    for {
        select {

        // store raft entries to wal, then publish over commit channel
        //node那边发送readyc后,这边收到消息
        case rd := <-rc.node.Ready():
             //将日志条目写入到wal文件
            rc.wal.Save(rd.HardState, rd.Entries)
             //如果snap不空,则保存快照文件,并提交
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
          //将日志条目写入到storage,storage是memorystorage
            rc.raftStorage.Append(rd.Entries)
          //将消息通过Transport发送给其他节点。
            rc.transport.Send(rd.Messages)
           //如果日志条目中有新的可提交日志,则提交到state machine那边执行
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot()
            //发送advance给node
            rc.node.Advance()
                  /*其他情况*/
        }
    }
}

// github.com/coreos/etcd/contrib/raftexample/raft.go
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
    for i := range ents {
        switch ents[i].Type {
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                // ignore empty messages
                break
            }
            s := string(ents[i].Data)
            select {
               //将可以提交的内容发送给commitC,业务端需要对commitC进行消费
            case rc.commitC <- &s:
            case <-rc.stopc:
                return false
            }
        //...
        }

        // after commit, update appliedIndex
             //appliedIndex记录的实际上已经发送给业务端(state machine)执行了的条目的index
        rc.appliedIndex = ents[i].Index

        // special nil commit to signal replay has finished
        if ents[i].Index == rc.lastIndex {
            select {
            case rc.commitC <- nil:
            case <-rc.stopc:
                return false
            }
        }
    }
    return true
}
上一篇下一篇

猜你喜欢

热点阅读