etcd学习笔记4(草稿)

2020-08-05  本文已影响0人  酱油王0901

ClientV3发送Put请求时,其携带的key,value数据被封装成一个Op,然后转化为一个pb.PutRequest以rpc形式被EtcdServer接收并处理。EtcdServer将序列化后的数据交由raft状态机进行处理。

// file: clientv3/kv.go
case tPut:
        var resp *pb.PutResponse
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, 
                    IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }

序列化后的data被封装成MsgProp类型的pb.Message,然后将其写入Node的propc通道。

func (n *node) Propose(ctx context.Context, data []byte) error {
     return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

raft node接收到propc通道中的数据后,将其应用到状态机中。

// file: raft/node.go
select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

然后根据角色类型调用stepLeader, stepCandidatestepFollower方法。以Leader为例

// file: raft/raft.go
 if !r.appendEntry(m.Entries...) {
      return ErrProposalDropped
}
r.bcastAppend()
return nil

raft首先将entries append到raftLog,更新每条entry的TermIndex,然后再将其广播到其他的peers。

// file: raft/raft.go
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
    li := r.raftLog.lastIndex()
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // Track the size of this uncommitted proposal.
    if !r.increaseUncommittedSize(es) {
        r.logger.Debugf(
            "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
            r.id,
        )
        // Drop the proposal.
        return false
    }
    // use latest "last" index after truncate/append
    li = r.raftLog.append(es...)
    r.prs.Progress[r.id].MaybeUpdate(li)
    // Regardless of maybeCommit's return, our caller will call bcastAppend.
    r.maybeCommit()
    return true
}

Leader发送Append Entries Message到peers。

  1. 首先判断progress有没有被暂停,如果被暂停则直接返回。
  2. 从raftLog中获取Term以及要发送的entries。如果获取失败一般是由于Log被compact导致的ErrCompacted,或者是ErrUnavailable
  3. 如果获取term或者entries失败则发送快照消息MsgSnap。同时将Progress状态重置为StateSnapshot, PendingSnapshot置为snapshot的index。
    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgSnap
    m.Snapshot = snapshot  // 从raftLog中获取snapshot
    
  4. 如果成功获取term和entries,则将消息类型置为MsgApp
    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgApp
    m.Index = pr.Next - 1  // TODO(zhengliang): pr.Next or pr.Next - 1 ?
    m.LogTerm = term
    m.Entries = ents
    m.Commit = r.raftLog.committed
    
  5. 如果要发送的entries大小不为0,则根据Progress的类型进行相应的处理。
// file: raft/raft.go
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        if id == r.id {
            return
        }
        r.sendAppend(id)
    })
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
    r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
    pr := r.prs.Progress[to]
    if pr.IsPaused() {
        return false
    }
    m := pb.Message{}
    m.To = to

    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {
        return false
    }

    if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
        if !pr.RecentActive {
            r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
            return false
        }

        m.Type = pb.MsgSnap
        snapshot, err := r.raftLog.snapshot()
        if err != nil {
            if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
            }
            panic(err) // TODO(bdarnell)
        }
        if IsEmptySnap(snapshot) {
            panic("need non-empty snapshot")
        }
        m.Snapshot = snapshot
        sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
        r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
            r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
        pr.BecomeSnapshot(sindex)
        r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
    } else {
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1 // TODO(zhengliang): pr.Next or pr.Next - 1 ?
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in StateReplicate
            case tracker.StateReplicate:
                last := m.Entries[n-1].Index
                pr.OptimisticUpdate(last)
                pr.Inflights.Add(last)
            case tracker.StateProbe:
                pr.ProbeSent = true
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    }
    r.send(m)
    return true
}

Message封装好之后,调用raft的send()方法发送出去。而raft的send()方法只是简单的做了一下检查,为Message加上Term。最后将其添加到raft的msgs队列。

r.msgs = append(r.msgs, m)

RawNode检测到有新的Ready处于pending状态,则populate成一个Ready对象。Ready是只读的,是对将要保存到stable storage,committed或者发送到peers的entries和messages的封装。然后将其写入readyc通道。

// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
    r := rn.raft
    if !r.softState().equal(rn.prevSoftSt) {
        return true
    }
    if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
        return true
    }
    if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
        return true
    }
    if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
        return true
    }
    if len(r.readStates) != 0 {
        return true
    }
    return false
}
// file: raft/node.go
func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
        if advancec != nil {
            readyc = nil
        } else if n.rn.HasReady() {
            // Populate a Ready. Note that this Ready is not guaranteed to
            // actually be handled. We will arm readyc, but there's no guarantee
            // that we will actually send on it. It's possible that we will
            // service another channel instead, loop around, and then populate
            // the Ready again. We could instead force the previous Ready to be
            // handled first, but it's generally good to emit larger Readys plus
            // it simplifies testing (by emitting less frequently and more
            // predictably).
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }
        .........
        case readyc <- rd:  // 将其写入readyc通道
            n.rn.acceptReady(rd)
            advancec = n.advancec

etcdServer的raftNode从通道中接收到Ready数据后,将其中的Committed entries和snapshot封装成 apply后写入applyc通道,在写入前会更新committed index。接着执行如下步骤:

  1. 如果为Leader node,则会将messages发送到peers。而其中的raftpb.MsgSnap类型的message会写入msgSnapC通道。leader可以并发写磁盘以及复制到peers节点。
  2. 持久化snapshot。
  3. 持久化Hard State和entries。
  4. 强制WAL去fsync它的hard state。
  5. 等待applyAll执行完成。
// file: etcdserver/raft.go
// apply contains entries, snapshot to be applied. Once
// an apply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raftDone before assuming the raft messages are stable.
type apply struct {
    entries  []raftpb.Entry
    snapshot raftpb.Snapshot
    // notifyc synchronizes etcd server applies with the raft node
    notifyc chan struct{}
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second

    go func() {
        defer r.onStop()
        islead := false

        for {
            select {
            case <-r.ticker.C:
                r.tick()
            case rd := <-r.Ready():
                if rd.SoftState != nil {
                    newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
                    if newLeader {
                        leaderChanges.Inc()
                    }

                    if rd.SoftState.Lead == raft.None {
                        hasLeader.Set(0)
                    } else {
                        hasLeader.Set(1)
                    }

                    rh.updateLead(rd.SoftState.Lead)
                    islead = rd.RaftState == raft.StateLeader
                    if islead {
                        isLeader.Set(1)
                    } else {
                        isLeader.Set(0)
                    }
                    rh.updateLeadership(newLeader)
                    r.td.Reset()
                }

                if len(rd.ReadStates) != 0 {
                    select {
                    case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
                    case <-time.After(internalTimeout):
                        r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
                    case <-r.stopped:
                        return
                    }
                }
                notifyc := make(chan struct{}, 1)
                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }

                updateCommittedIndex(&ap, rh)

                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                // the leader can write to its disk in parallel with replicating to the followers and them
                // writing to their disks.
                // For more details, check raft thesis 10.2.1
                if islead {
                    // gofail: var raftBeforeLeaderSend struct{}
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    // gofail: var raftBeforeSaveSnap struct{}
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                    // gofail: var raftAfterSaveSnap struct{}
                }

                // gofail: var raftBeforeSave struct{}
                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }
                // gofail: var raftAfterSave struct{}

                if !raft.IsEmptySnap(rd.Snapshot) {
                    // Force WAL to fsync its hard state before Release() releases
                    // old data from the WAL. Otherwise could get an error like:
                    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                    if err := r.storage.Sync(); err != nil {
                        r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                    }
                    // etcdserver now claim the snapshot has been persisted onto the disk
                    notifyc <- struct{}{}

                    // gofail: var raftBeforeApplySnap struct{}
                    r.raftStorage.ApplySnapshot(rd.Snapshot)
                    r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                    // gofail: var raftAfterApplySnap struct{}

                    if err := r.storage.Release(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                    }
                    // gofail: var raftAfterWALRelease struct{}
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    // finish processing incoming messages before we signal raftdone chan
                    msgs := r.processMessages(rd.Messages)

                    // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
                    notifyc <- struct{}{}

                    // Candidate or follower needs to wait for all pending configuration
                    // changes to be applied before sending messages.
                    // Otherwise we might incorrectly count votes (e.g. votes from removed members).
                    // Also slow machine's follower raft-layer could proceed to become the leader
                    // on its own single-node cluster, before apply-layer applies the config change.
                    // We simply wait for ALL pending entries to be applied for now.
                    // We might improve this later on if it causes unnecessary long blocking issues.
                    waitApply := false
                    for _, ent := range rd.CommittedEntries {
                        if ent.Type == raftpb.EntryConfChange {
                            waitApply = true
                            break
                        }
                    }
                    if waitApply {
                        // blocks until 'applyAll' calls 'applyWait.Trigger'
                        // to be in sync with scheduled config-change job
                        // (assume notifyc has cap of 1)
                        select {
                        case notifyc <- struct{}{}:
                        case <-r.stopped:
                            return
                        }
                    }

                    // gofail: var raftBeforeFollowerSend struct{}
                    r.transport.Send(msgs)
                } else {
                    // leader already processed 'MsgSnap' and signaled
                    notifyc <- struct{}{}
                }

                r.Advance()
            case <-r.stopped:
                return
            }
        }
    }()
}

etcdServer从applyc中接收到apply数据后会将其apply。

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
    s.applySnapshot(ep, apply)
    s.applyEntries(ep, apply)

    proposalsApplied.Set(float64(ep.appliedi))
    s.applyWait.Trigger(ep.appliedi)

    // wait for the raft routine to finish the disk writes before triggering a
    // snapshot. or applied index might be greater than the last index in raft
    // storage, since the raft routine might be slower than apply routine.
    <-apply.notifyc

    s.triggerSnapshot(ep)
    select {
    // snapshot requested via send()
    case m := <-s.r.msgSnapC:
        merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
        s.sendMergedSnap(merged)
    default:
    }
}
上一篇下一篇

猜你喜欢

热点阅读