2.2 etcd源码笔记 - raft library - 流程

2019-08-07  本文已影响0人  兰CC

一、基础元素

1. 提案记录

raft 状态机 最核心的功能就是协商出一致的提案(或者说是将提案同步到整个集群),

每一条提案由 etcd/raft/raftpb/raft.pb.go:Entry 表示

type Entry struct {
      //当前的选举轮次
    Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
    //每一条提案都有一个Index,递增,可以理解为ID
    Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
    //普通提案 或 配置变更提案
    Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
    //具体的数据
    Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
    //这个不用关心,用于存储PB后的二进制数据
    XXX_unrecognized []byte    `json:"-"`
}

2. 快照

同步数据过程中, raft library 采用 “快照 + 提案” 的机制。

服务会不定期地将 已协商一致的提案 打包压缩成一个 数据快照

比如 put(a,1) put(a,2) put(b,3) 会打包成 数据快照 a:2;b:3

正常情况下,节点启动会从本地读取 数据快照,然后在此基础上继续同步提案

同步提案的过程中,如果有异常,则会将先同步 Leader 的最新 数据快照Follower,然后再此 数据快照 的基础上,再继续同步提案

type Snapshot struct {
    //被打包压缩的数据,数据格式由具体的应用层提供
    Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
    //该份数据对应的 raft 状态机的状态,包括 Nodes、Index、Item
    Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
    //不用关心,表示序列成PB的二进制数据
    XXX_unrecognized []byte           `json:"-"`
}

3. offset

哪些提案已协商好、应用层已处理好哪些协商好的提案,并不是记录具体的提案,

而是按顺序处理每个提案,然后标记一个 offset,表示协商、处理到哪个提案了。

比如 committed_indexapplied_index

4. raftLog

在整个过程中,需要记录以下元素

以上内容都记录在 etcd/raft/log.go:raftLog

type raftLog struct {
    //之前以为是 storage 表示“协商一致的提案”,unstable表示“协商中的提案”
    //然后后者协商一致后,挪到前者
    //然而我错了,
    //storage 如字面意思,就是存储的,已经落地了,但是目前只有一个实现 MemoryStorage,也是搞笑
    //unstable表示不稳定的,在内存里的数据
    storage Storage

    unstable unstable

    //提案Index,表示已协商到哪个提案
    committed uint64
    
    //提案Index,表示应用层已处理到哪个提案
    applied uint64

    logger Logger
}

type MemoryStorage struct {
    sync.Mutex
    
    //当前raft状态机的状态
    //TODO: 介绍每个字段的含义
    hardState pb.HardState
    
    //协商好的数据快照
    snapshot  pb.Snapshot
    
    //协商好的提案
    ents []pb.Entry
}

type unstable struct {
    //同步数据有异常时,需要从 Leader 同步到 Follower 的 数据快照
    //这个字段有值,就会被塞到 Ready.Snapshot,进而被传输层丢给 Follower
    snapshot *pb.Snapshot
    
    //正在协商中的提案
    entries []pb.Entry
    
    //这个跟上面的offset不是同一个意思,取值为 snapshot.lastIndex
    //这是为了方便从 entries 中获取数据,entries[entry.index - offset]=entry 就等于 
    offset  uint64

    logger Logger
}

5. progress

raft library 在 Leader 与 Follower 同步数据的过程中,定义了 Progress 来表示 Folllower 的状态信息。

包括 当前是否存活、已发送成功Index、待发送Index、状态等。

其中状态有三种

源码附带的文档 etcd/raft/design.mdProgress 详细的设计说明介绍可以参考之。

type Progress struct {
    //已发送成功的提案Index
    Match uint64
    //下一条要发送的提案Index
    Next uint64
    
    //当前状态
    State ProgressStateType

    //probe状态使用,探测出节点有问题,会标记为true,就会停止同步数据
    Paused bool
    
    //表示要同步的 snapshot 的 index
    PendingSnapshot uint64

    //是否存活,收到任何来自 Follower 的消息,都认为其状态是存活的
    //Leader 会在每次发心跳的时候,顺便检查一下该状态
    RecentActive bool

    //发送数据的一个滑动窗口,机制类似TCP那样子的滑动窗口
    ins *inflights

    //是否Learner 是的话,只是复本,不能参与选举
    IsLearner bool
}

二、流程 - 启动

应用层会调用该 StartNode 启动一个 raft-node

func StartNode(c *Config, peers []Peer) Node {
    //raft相关的操作封装到 etcd/raft/raft.go:raft 
    r := newRaft(c)
    
    //初始启动时,状态是 Follower,并且 term:1,Leader:None
    //要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,
    // 要嘛就是到点触发 tickElection,进行选举。
    r.becomeFollower(1, None)
    
    //把 提案-“增加集群中的节点” 加到 “已协商提案” 中
    //这样是为了把 提案-“增加集群中的节点” 通过 ready chan 吐出去给外系统
    //比如“传输层”需要拿到这些节点,然后创建对应的连接
    //比如 node本身 拿到这些节点,创建节点对应的progress
    //(这边其实是为了复用创建progress的流程,不然是可以直接调用代码创建progress)
    for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        if err != nil {
            panic("unexpected marshal error")
        }
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
    }
    
    //从本地加载出来的都是已经协商一致的
    r.raftLog.committed = r.raftLog.lastIndex()
    
    //创建集群中各个节点对应的 progress
    //上面把节点新增消息塞到 ready chan,也是为了创建 progress
    //这边直接代码调用,是为了test case 可以立即在 StartNode 执行完之后,马上进行选举
    //不然走 ready chan 会有一定时间的延迟
    for _, peer := range peers {
        r.addNode(peer.ID)
    }

    n := newNode()
    n.logger = c.Logger
    
    //最终启动的姿态就是,开启一个 for 循环不断读取 raft 状态机 丢出来的数据并进行处理
    go n.run(r)
    return &n
}

源代码 etcd/raft/raft.go 比较简单,这边就介绍一下 raft

type raft struct {
    //外界会传入节点ID
    //生成方式  sha1(cluster name, peer URLs, time)
    id uint64
    //当前选举轮次
    Term uint64 
    //这个好像是投票给哪个 node?
    Vote uint64 
    //跟读取数据相关,后续介绍
    readStates []ReadState
    //这个就是基础数据里所描述的raftLog
    raftLog *raftLog
    //各种progress
    prs         map[uint64]*Progress
    learnerPrs  map[uint64]*Progress
    //如果是Leaner 就只是作为副本,不能选举
    isLearner bool
    //投票结果
    votes map[uint64]bool
    //这个就是要吐给 ready chan 的数据,处理完又会被清空
    msgs []pb.Message
    // the leader id
    lead uint64
    //即将要更换的lead
    leadTransferee uint64
    //周期的任务
    tick func()
    //接收到消息的处理 stepFollower、stepCandidate、stepLeader
    step stepFunc
    //每次心跳是否要校验节点是否足够
    checkQuorum bool
    //选举前,是否要判断一下存活的节点是否足够
    preVote     bool
    //选举和心跳相关
    electionElapsed int
    heartbeatElapsed int
    heartbeatTimeout int
    electionTimeout  int
    randomizedElectionTimeout int
    disableProposalForwarding bool
    ...
}

状态切为 Follower后,

要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,

要嘛就是到点触发 tickElection,进行选举。

func (r *raft) becomeFollower(term uint64, lead uint64) {
    //当接收到消息时,如果共用的Step方法处理不了,就会调用  stepFollower
    r.step = stepFollower
    r.reset(term)
    //到点会触发选举
    r.tick = r.tickElection
    //设置 Leader,如果初启时,Leader 为 None
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

for 循环监听 raft 状态机吐出的数据,并进行对应的处理

func (n *node) run(r *raft) {
    //准备各种变量
    var propc chan pb.Message
    var readyc chan Ready
    var advancec chan struct{}
    var prevLastUnstablei, prevLastUnstablet uint64
    var havePrevLastUnstablei bool
    var prevSnapi uint64
    var rd Ready

    lead := None
    //TODO: softState到底是个啥?
    prevSoftSt := r.softState()
    prevHardSt := emptyState
    
    //开始无限循环,无限监听
    for {
        if advancec != nil {
            readyc = nil
        } else {
            //构建 raft 状态机 已经协商好的数据
            rd = newReady(r, prevSoftSt, prevHardSt)
            if rd.containsUpdates() {
                //如果有更新,就将readyc 赋值为 node.readyc,后面写数据要用
                readyc = n.readyc
            } else {
                //如果有更新,就将readyc置为nil,这样后面就不会写数据到 node.readyc,就不会有数据输出
                //不过这样通过把 readyc 置为nil 来控制不输出数据的写法,有点怪怪的
                readyc = nil
            }
        }

        //r.lead 比 lead 更新,如果不相等,说明 raft 状态机的 lead 有变动
        if lead != r.lead {
            if r.hasLeader() {
                if lead == None {
                    r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
                } else {
                    r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
                }
                //把 propc 赋值为 node.proc,读取出需要处理的消息
                propc = n.propc
            } else {
                r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
                //丢失lead,则把proc置为nil,不再处理消息
                propc = nil
            }
            //更新lead
            lead = r.lead
        }

        select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case m := <-propc:
            //有状态机需要处理的消息,直接调用 raft.Step(m pb.Message)
            m.From = r.id
            r.Step(m)
        case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
                r.Step(m) // raft never returns an error
            }
        case cc := <-n.confc:
            //节点配置发生变化,需要变更对应的progress
            if cc.NodeID == None {
                r.resetPendingConf()
                select {
                case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
                case <-n.done:
                }
                break
            }
            switch cc.Type {
            case pb.ConfChangeAddNode:
                r.addNode(cc.NodeID)
            case pb.ConfChangeAddLearnerNode:
                r.addLearner(cc.NodeID)
            case pb.ConfChangeRemoveNode:
                // block incoming proposal when local node is
                // removed
                if cc.NodeID == r.id {
                    propc = nil
                }
                r.removeNode(cc.NodeID)
            case pb.ConfChangeUpdateNode:
                r.resetPendingConf()
            default:
                panic("unexpected conf type")
            }
            select {
            //把最新的状态吐出去
            case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
            case <-n.done:
            }
        case <-n.tickc:
            //到点执行周期任务 Leader-tickHeartbeat; Candidate && follower-tickElecion
            r.tick()
        case readyc <- rd:
            //这就是上面提到的 有更新时 readyc不为nil, 更新数据rd会写入 readyc
            //这样系统外就能收到这个更新信息
            if rd.SoftState != nil {
                prevSoftSt = rd.SoftState
            }
            if len(rd.Entries) > 0 {
                prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
                prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
                havePrevLastUnstablei = true
            }
            if !IsEmptyHardState(rd.HardState) {
                prevHardSt = rd.HardState
            }
            if !IsEmptySnap(rd.Snapshot) {
                prevSnapi = rd.Snapshot.Metadata.Index
            }

            //r.msgs表示同步给其他节点的信息,这边传到系统外之后就要置为nil
            r.msgs = nil
            //同上,读取数据的时候使用,置为nil
            r.readStates = nil
            //advancec不为nil,下一个for循环就会执行 <-advancec
            advancec = n.advancec
        case <-advancec:
            //更新raftlog,将“协商中”的数据,更新为 “协商一致”
            if prevHardSt.Commit != 0 {
                r.raftLog.appliedTo(prevHardSt.Commit)
            }
            if havePrevLastUnstablei {
                //压缩空间,移除已协商一致的数据
                r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
                havePrevLastUnstablei = false
            }
            r.raftLog.stableSnapTo(prevSnapi)
            //advancec为nil,下一个for循环就不会执行 <-advancec
            advancec = nil
        case c := <-n.status:
            //丢状态信息到系统外
            c <- getStatus(r)
        case <-n.stop:
            //下掉节点
            close(n.done)
            return
        }
    }
}

三、流程 - 选举

当 Follower 和 Candidates 到点之后,会开始触发选举,具体步骤如下

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
    r.electionElapsed++

    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        //发一条 pb.MsgHub 的消息到状态机
        //Hub 译义是 “行进时为使步伐一致的吆喝声”
        //在整个同步集群中,这个词就显得很有意思,因为很贴切,
        //发出这么一个类型的消息,就是为了让集群内的节点在某个关系上达到一致,或数据一致(步伐一致)
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}
func (r *raft) Step(m pb.Message) error {
    ...
    switch m.Type {
    case pb.MsgHup:
        if r.state != StateLeader {
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if err != nil {
                r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
            }
            if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
                r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
                return nil
            }

            r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
            if r.preVote {
                //如果开启preVote,会先进行选举前的校验,即判断节点个数是否足够,具体就是 
                //1.将状态置为 StatePreCandidate,
                //2.然后给其他节点发 pb.MsgPreVote,其他节点如果有反馈,则计数+1
                //3.一直到 发起选举的节点,收到了 节点数/2 + 1  的反馈,该节点会进入  StateCandidate, 就可以进行选举了
                //这样做是为了防止脑裂时,节点数不够还进行无效选举, 导致 Item 不断增大
                r.campaign(campaignPreElection)
            } else {
                //直接发起选举
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }
    ...
    }
    ...
    return nil
}

这边直接选举,不再介绍 campaignPreElection 的流程

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        //预选举的流程
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        term = r.Term + 1
    } else {
        //选举的流程
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    
    //如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
    if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    
    //挨个节点发 MsgPreVote 或 MsgVote
    for id := range r.prs {
        if id == r.id {
            continue
        }
        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}
func (r *raft) send(m pb.Message) {
    ...
    //把消息写到 raft.msgs,再写到 ready chan,最后由 传输层 丢给其他节点
    //参考 “一、4. node.run(r *raft)”
    r.msgs = append(r.msgs, m)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
    //把消息写到 node.recvc,然后node的 run 会再次消费到该消息,然后调用 raft.step
    //参考 “一、4. node.run(r *raft)”
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }

    select {
    case ch <- m:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
}
func (r *raft) Step(m pb.Message) error {
    ...
    switch m.Type {
    ...
    case pb.MsgVote, pb.MsgPreVote:
        //如果是Learner,不参与投票
        if r.isLearner {
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            return nil
        }

        if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            //回给发送方一条 MsgPreVoteResp 或 MsgVoteResp
            //发送操作逻辑同上面的 “4. Follower && Candidate 调用 raft.send(preVoteMsg / voteMsg)”
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                r.electionElapsed = 0
                r.Vote = m.From
            }
        } else {
            r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
        }
    ...
    }
    ...
    return nil
}
func stepCandidate(r *raft, m pb.Message) {
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    ... 
    case myVoteRespType:
        gr := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
        //如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
        switch r.quorum() {
        case gr:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                //节点本身切为 Leader
                r.becomeLeader()
                //开始向其他节点分发提案
                r.bcastAppend()
            }
        case len(r.votes) - gr:
            r.becomeFollower(r.Term, None)
        }
    ...
    }
}
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        ...
        switch {
        ...
        default:
            r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                r.id, r.Term, m.Type, m.From, m.Term)
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                //更新Leader,节点本身变成 Follower
                r.becomeFollower(m.Term, m.From)
            } else {
                r.becomeFollower(m.Term, None)
            }
        }
    ...
    }
}

四、流程 - put(k, v)

代码路径如下

func (n *node) Propose(ctx context.Context, data []byte) error {
    //一样是写 MspProp 消息到 raft 状态机,然后写到 node.propc,再由node.run 的 for循环处理
    //最终调用 raft.Step
    return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
    case pb.MsgProp:
        if r.lead == None {
            r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
            return
        } else if r.disableProposalForwarding {
            r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
            return
        }
        //会转发给 Leader
        m.To = r.lead
        r.send(m)
    ...
    }
}
func stepLeader(r *raft, m pb.Message) {
    switch m.Type {
    ...
    case pb.MsgProp:
        ...
        //把提案追加到“协商中的提案”中
        r.appendEntry(m.Entries...)
        //把上述的操作广播出去
        //因为需要Follower表示OK,Leader才能commit
        r.bcastAppend()
        return
    ...
    }
}

func (r *raft) appendEntry(es ...pb.Entry) {
    //取最新的Index
    //先取 “协商中的提案” 的最新Index,raftLog.unstable.entries[最后一个] --> raftlog.unstable.snapshot.LastIndex()
    //如果没有,则取 “已协商一致的提案”的最新Index,raftLog.storage.LastIndex()
    li := r.raftLog.lastIndex()
    //挨个递增+1赋值Index
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    //追加到“协商中的提案”中
    r.raftLog.append(es...)
    //1.“代表Leader的Progress”中的match 更新为 最新的Index,
    //match表示已发送,由于Leader本身不用发送,所以就直接更新了
    //2.方法名为嘛多了maybe前缀呢,因为有可能更新不成功,
    //当 lastIndex <= match 时,就更新不成功,
    //因为raft library 的机制只允许offset往前走
    r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
    //提交,即 raftLog.committed 住前进
    //1.如果投票结果少于 (节点数/2 +1 ),就提交不了,所以叫 maybeCommit
    //2.理论上只有单节点的时候才会提交成功,
    //多节点,会在响应Follower反馈的地方,再次调用该方法进行提交
    //所以这边不用关心返回值
    r.maybeCommit()
}

func (r *raft) maybeCommit() bool {
    //把所有节点已收到的提案Index放在一个数组里
    mis := make(uint64Slice, 0, len(r.prs))
    for _, p := range r.prs {
        mis = append(mis, p.Match)
    }
    //降序
    sort.Sort(sort.Reverse(mis))
    //取中间法定数-1的match
    //比如节点是5,法定数就是3
    //mci - matchIndex = 降序Match数组[2],即取法定数中match最小的那一个
    //如果 mci > raftLog.committed,说明降序Match数组至少有 3个已经满足 mci > raftLog.committed
    //这样就可以提交了
    mci := mis[r.quorum()-1]
    //尝试提交
    return r.raftLog.maybeCommit(mci, r.Term)
}

func (r *raft) bcastAppend() {
    //挨个progress进行 sendAppend
    r.forEachProgress(func(id uint64, _ *Progress) {
        if id == r.id {
            return
        }

        r.sendAppend(id)
    })
}

func (r *raft) sendAppend(to uint64) {
    pr := r.getProgress(to)
    if pr.IsPaused() {
        return
    }
    m := pb.Message{}
    m.To = to
    
    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    
    if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
        //就是上面所说的,同步过程中出现异常,就会重新同步  数据快照
        if !pr.RecentActive {
            r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
            return
        }
        m.Type = pb.MsgSnap
        snapshot, err := r.raftLog.snapshot()
        if err != nil {
            if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return
            }
            panic(err) // TODO(bdarnell)
        }
        if IsEmptySnap(snapshot) {
            panic("need non-empty snapshot")
        }
        m.Snapshot = snapshot
        sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
        r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
            r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
        //progress 切到 snapshot 状态
        pr.becomeSnapshot(sindex)
        r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
    } else {
        //正常情况是会给 Follower 发 MsgApp 消息
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in ProgressStateReplicate
            case ProgressStateReplicate:
                last := m.Entries[n-1].Index
                pr.optimisticUpdate(last)
                pr.ins.add(last)
            case ProgressStateProbe:
                //如果是探测状态,发完这一条,就不会再发
                //然后收到反馈时,会调用  pr.resume()
                pr.pause()
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    }
    // 这边就是要发提案消息了
    r.send(m)
}

func (r *raft) send(m pb.Message) {
    m.From = r.id
    ...
    //追加到 raft.msgs,写到 ready chan,再由 node.run() 消费到,丢给传输层,再丢给其他节点
    r.msgs = append(r.msgs, m)
}
func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
    ...
    case pb.MsgApp:
        r.electionElapsed = 0
        r.lead = m.From
        r.handleAppendEntries(m)
    ...
    }
}

func (r *raft) handleAppendEntries(m pb.Message) {
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }

    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        //如果 term 是对的,并且能够提交(提交细节很装箱单 的,不再介绍,自己看一下maybeAppend)
        //就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : false
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        //不能提交
        //就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : true,带上节点最新的 Index
        //Leader 会尝试同步更旧的数据
        r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
            r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}
func stepLeader(r *raft, m pb.Message) {
    ...
    pr := r.getProgress(m.From)
    if pr == nil {
        r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return
    }
    switch m.Type {
    case pb.MsgAppResp:
        //收到反馈信息,说明该节点是存活的
        pr.RecentActive = true

        if m.Reject {
            r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
                r.id, m.RejectHint, m.From, m.Index)
            //尝试从更旧的数据开始同步
            if pr.maybeDecrTo(m.Index, m.RejectHint) {
                r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                if pr.State == ProgressStateReplicate {
                    pr.becomeProbe()
                }
                r.sendAppend(m.From)
            }
        } else {
            oldc := pr.IsPaused()
            //更新节点对应progress的Match及Next
            if pr.maybeUpdate(m.Index) {
                switch {
                case pr.State == ProgressStateProbe:
                    //节点能正常接收消息,则从探测状态切为副本状态
                    pr.becomeReplicate()
                case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
                    r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                    pr.becomeProbe()
                case pr.State == ProgressStateReplicate:
                    //副本状态下,需要调整发送的滑动窗口,以备下次发送
                    pr.ins.freeTo(m.Index)
                }
                
                //尝试提交,即将 “协商中的提案” 变更为 “协商一致的提案”
                if r.maybeCommit() {
                    //如果该提案超过法定人数反馈,则表示协商一致,
                    //所有节点可以进入下一轮数据同步
                    r.bcastAppend()
                } else if oldPaused {
                    //如果不一致,则没有必要全部进入下一轮数据同步
                    //但是节点之前如果是Paused, 那之前是可能有很多消息滞留,未发送,那就继续同步,
                    r.sendAppend(m.From)
                }
                // Transfer leadership is in progress.
                if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                    r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                    r.sendTimeoutNow(m.From)
                }
            }
        }
    ...
    }
}

这边有一个问题,就是 raft.maybeCommit 只是会更新 offset,具体的 []entriy 的落地, 是不会更新的

raftLog.Storage 不在这边操作,而是在应用层

这个操作为何是在应用层做,理论上应该也封装在 raft librar 里才对。

这边抓了 raftexample 的示例代码 etcd/contrib/raftexample/raft.go

func (rc *raftNode) serveChannels() {
    ...
    for {
        select {
        ...
        //这边是读取ready chan,消费数据 
        case rd := <-rc.node.Ready():
            //WAL操作,先不用管,后续再介绍 WAL
            rc.wal.Save(rd.HardState, rd.Entries)

            //如果 ready 里有 snapshopt,就要重新设置 snapshot
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }

            //rc.raftStorage 与 raftLog.Storage 是同一个对象
            //就是在这位置Entries给落地了
            rc.raftStorage.Append(rd.Entries)
            rc.transport.Send(rd.Messages)
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot()
            rc.node.Advance()
        ...
        }
    }
}

参考 *“一、基础数据 - 4. node.run(r raft)”,通过 ready chan 把数据吐出去

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
    rd := Ready{
        Entries:          r.raftLog.unstableEntries(),
        //就是这个位置,r.maybeCommit之后,r.raftLog.nextEnts() 就会有数据
        CommittedEntries: r.raftLog.nextEnts(),
        Messages:         r.msgs,
    }
    ...
    return rd
}

代码路径是

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    commit := min(r.getProgress(to).Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        //会带上Leader已经 commit 的 Index
        Commit:  commit,
        Context: ctx,
    }

    r.send(m)
}
func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
    ...
    case pb.MsgHeartbeat:
        r.electionElapsed = 0
        r.lead = m.From
        //Follower 在这边处理 Leader 发过来的心跳包
        r.handleHeartbeat(m)
    ...
    }
}

func (r *raft) handleHeartbeat(m pb.Message) {
    //这边更新 commitIndex
    r.raftLog.commitTo(m.Commit)
    //给 Leader 发心跳Feeback
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
上一篇 下一篇

猜你喜欢

热点阅读