etcd学习笔记(三): Propose

2021-12-06  本文已影响0人  wangshanshi

我们已经在raft-example看过了对Propose的简单处理了,但是真正的etcd对Propose的处理更加复杂。主要是有如下几个点:

  1. consistent index。用于处理boltdb和raftlog之间的幂等性。
  2. 同步返回。由于raft的log复制是异步的,如何做到同步返回结果。

当blotdb用作状态机的时候,wal和blotdb作为两个不同的实体,很有可能存在不一致的情况。所以etcd在blotdb中存储一条记录consistent-index,来代表已经apply到blot-db上成功的log index,这样当根据wal恢复blot-db的时候,就可以判断log index是不是已经被apply过。

处理过程

在etcd-server中,一条propose的处理过程:
首先是为每一条请求注册一个唯一requestID,然后register并等待requestID处理完成

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {

    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }

    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    ch := s.w.Register(id)

    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    err = s.r.Propose(cctx, data)


    select {
    case x := <-ch:
        return x.(*applyResult), nil
    case <-cctx.Done():
        proposalsFailed.Inc()
        s.w.Trigger(id, nil) // GC wait
        return nil, s.parseProposeCtxErr(cctx.Err(), start)
    case <-s.done:
        return nil, ErrStopped
    }
}

请求转发到raft-node处理

根据之前的知识,raft-node经过一番处理之后,交给上层的ready结构来处理,首先msg转发到leader,然后leader调用processMsg,这里主要的操作是copy log到follower。


                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }

                updateCommittedIndex(&ap, rh)

                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                if islead {
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                }

                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }

                if !raft.IsEmptySnap(rd.Snapshot) {
                    // Force WAL to fsync its hard state before Release() releases
                    // old data from the WAL. Otherwise could get an error like:
                    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                    if err := r.storage.Sync(); err != nil {
                        r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                    }

                    // etcdserver now claim the snapshot has been persisted onto the disk
                    notifyc <- struct{}{}

                    // gofail: var raftBeforeApplySnap struct{}
                    r.raftStorage.ApplySnapshot(rd.Snapshot)
                    r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                    // gofail: var raftAfterApplySnap struct{}

                    if err := r.storage.Release(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                    }
                    // gofail: var raftAfterWALRelease struct{}
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    msgs := r.processMessages(rd.Messages)
                    notifyc <- struct{}{}
                    r.transport.Send(msgs)
                } else {
                    // leader already processed 'MsgSnap' and signaled
                    notifyc <- struct{}{}
                }

                r.Advance()

最终调用的是apply

        case ap := <-s.r.apply():
            f := func(context.Context) { s.applyAll(&ep, &ap) }
            sched.Schedule(f)


func (s *EtcdServer) apply(
    es []raftpb.Entry,
    confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
    s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
    for i := range es {
        e := es[i]

        switch e.Type {
        case raftpb.EntryNormal:
            s.applyEntryNormal(&e)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)

        case raftpb.EntryConfChange:
            // We need to apply all WAL entries on top of v2store
            // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
            shouldApplyV3 := membership.ApplyV2storeOnly

            // set the consistent index of current executing entry
            if e.Index > s.consistIndex.ConsistentIndex() {
                s.consistIndex.SetConsistentIndex(e.Index, e.Term)
                shouldApplyV3 = membership.ApplyBoth
            }

            var cc raftpb.ConfChange
            pbutil.MustUnmarshal(&cc, e.Data)
            removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)
            shouldStop = shouldStop || removedSelf
            s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})

        }
        appliedi, appliedt = e.Index, e.Term
    }
    return appliedt, appliedi, shouldStop
}
上一篇下一篇

猜你喜欢

热点阅读