2.2 etcd源码笔记 - raft library - 流程
一、基础元素
1. 提案记录
raft 状态机 最核心的功能就是协商出一致的提案(或者说是将提案同步到整个集群),
每一条提案由 etcd/raft/raftpb/raft.pb.go:Entry 表示
type Entry struct {
//当前的选举轮次
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
//每一条提案都有一个Index,递增,可以理解为ID
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
//普通提案 或 配置变更提案
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
//具体的数据
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
//这个不用关心,用于存储PB后的二进制数据
XXX_unrecognized []byte `json:"-"`
}
2. 快照
同步数据过程中, raft library 采用 “快照 + 提案” 的机制。
服务会不定期地将 已协商一致的提案 打包压缩成一个 数据快照。
比如 put(a,1) put(a,2) put(b,3) 会打包成 数据快照 a:2;b:3
正常情况下,节点启动会从本地读取 数据快照,然后在此基础上继续同步提案
同步提案的过程中,如果有异常,则会将先同步 Leader 的最新 数据快照 给 Follower,然后再此 数据快照 的基础上,再继续同步提案
type Snapshot struct {
//被打包压缩的数据,数据格式由具体的应用层提供
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
//该份数据对应的 raft 状态机的状态,包括 Nodes、Index、Item
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
//不用关心,表示序列成PB的二进制数据
XXX_unrecognized []byte `json:"-"`
}
3. offset
哪些提案已协商好、应用层已处理好哪些协商好的提案,并不是记录具体的提案,
而是按顺序处理每个提案,然后标记一个 offset,表示协商、处理到哪个提案了。
比如 committed_index、 applied_index
4. raftLog
在整个过程中,需要记录以下元素
- 已协商一致的提案
- 正在协商中的提案
- 应用层已处理完的已协商一致提案
以上内容都记录在 etcd/raft/log.go:raftLog
type raftLog struct {
//之前以为是 storage 表示“协商一致的提案”,unstable表示“协商中的提案”
//然后后者协商一致后,挪到前者
//然而我错了,
//storage 如字面意思,就是存储的,已经落地了,但是目前只有一个实现 MemoryStorage,也是搞笑
//unstable表示不稳定的,在内存里的数据
storage Storage
unstable unstable
//提案Index,表示已协商到哪个提案
committed uint64
//提案Index,表示应用层已处理到哪个提案
applied uint64
logger Logger
}
type MemoryStorage struct {
sync.Mutex
//当前raft状态机的状态
//TODO: 介绍每个字段的含义
hardState pb.HardState
//协商好的数据快照
snapshot pb.Snapshot
//协商好的提案
ents []pb.Entry
}
type unstable struct {
//同步数据有异常时,需要从 Leader 同步到 Follower 的 数据快照
//这个字段有值,就会被塞到 Ready.Snapshot,进而被传输层丢给 Follower
snapshot *pb.Snapshot
//正在协商中的提案
entries []pb.Entry
//这个跟上面的offset不是同一个意思,取值为 snapshot.lastIndex
//这是为了方便从 entries 中获取数据,entries[entry.index - offset]=entry 就等于
offset uint64
logger Logger
}
5. progress
raft library 在 Leader 与 Follower 同步数据的过程中,定义了 Progress 来表示 Folllower 的状态信息。
包括 当前是否存活、已发送成功Index、待发送Index、状态等。
其中状态有三种
- probe 探测状态,每次只同步一条提案;当接收成功,就表示该节点是正常状态,会切换至 replicate
- replicate 副本状态,每次都同步尽量多的提案;当发生异常,就会切换至 snapshot
- snapshot 快照状态,需要同步 数据快照,同步完切换至 probe
源码附带的文档 etcd/raft/design.md 有 Progress 详细的设计说明介绍可以参考之。
type Progress struct {
//已发送成功的提案Index
Match uint64
//下一条要发送的提案Index
Next uint64
//当前状态
State ProgressStateType
//probe状态使用,探测出节点有问题,会标记为true,就会停止同步数据
Paused bool
//表示要同步的 snapshot 的 index
PendingSnapshot uint64
//是否存活,收到任何来自 Follower 的消息,都认为其状态是存活的
//Leader 会在每次发心跳的时候,顺便检查一下该状态
RecentActive bool
//发送数据的一个滑动窗口,机制类似TCP那样子的滑动窗口
ins *inflights
//是否Learner 是的话,只是复本,不能参与选举
IsLearner bool
}
二、流程 - 启动
- 入口 etcd/raft/node.go
应用层会调用该 StartNode 启动一个 raft-node
func StartNode(c *Config, peers []Peer) Node {
//raft相关的操作封装到 etcd/raft/raft.go:raft
r := newRaft(c)
//初始启动时,状态是 Follower,并且 term:1,Leader:None
//要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,
// 要嘛就是到点触发 tickElection,进行选举。
r.becomeFollower(1, None)
//把 提案-“增加集群中的节点” 加到 “已协商提案” 中
//这样是为了把 提案-“增加集群中的节点” 通过 ready chan 吐出去给外系统
//比如“传输层”需要拿到这些节点,然后创建对应的连接
//比如 node本身 拿到这些节点,创建节点对应的progress
//(这边其实是为了复用创建progress的流程,不然是可以直接调用代码创建progress)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
//从本地加载出来的都是已经协商一致的
r.raftLog.committed = r.raftLog.lastIndex()
//创建集群中各个节点对应的 progress
//上面把节点新增消息塞到 ready chan,也是为了创建 progress
//这边直接代码调用,是为了test case 可以立即在 StartNode 执行完之后,马上进行选举
//不然走 ready chan 会有一定时间的延迟
for _, peer := range peers {
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
//最终启动的姿态就是,开启一个 for 循环不断读取 raft 状态机 丢出来的数据并进行处理
go n.run(r)
return &n
}
- newRaft(c *Config)
源代码 etcd/raft/raft.go 比较简单,这边就介绍一下 raft
type raft struct {
//外界会传入节点ID
//生成方式 sha1(cluster name, peer URLs, time)
id uint64
//当前选举轮次
Term uint64
//这个好像是投票给哪个 node?
Vote uint64
//跟读取数据相关,后续介绍
readStates []ReadState
//这个就是基础数据里所描述的raftLog
raftLog *raftLog
//各种progress
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
//如果是Leaner 就只是作为副本,不能选举
isLearner bool
//投票结果
votes map[uint64]bool
//这个就是要吐给 ready chan 的数据,处理完又会被清空
msgs []pb.Message
// the leader id
lead uint64
//即将要更换的lead
leadTransferee uint64
//周期的任务
tick func()
//接收到消息的处理 stepFollower、stepCandidate、stepLeader
step stepFunc
//每次心跳是否要校验节点是否足够
checkQuorum bool
//选举前,是否要判断一下存活的节点是否足够
preVote bool
//选举和心跳相关
electionElapsed int
heartbeatElapsed int
heartbeatTimeout int
electionTimeout int
randomizedElectionTimeout int
disableProposalForwarding bool
...
}
- raft.becomeFollower(term uint64, lead uint64)
状态切为 Follower后,
要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,
要嘛就是到点触发 tickElection,进行选举。
func (r *raft) becomeFollower(term uint64, lead uint64) {
//当接收到消息时,如果共用的Step方法处理不了,就会调用 stepFollower
r.step = stepFollower
r.reset(term)
//到点会触发选举
r.tick = r.tickElection
//设置 Leader,如果初启时,Leader 为 None
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
- node.run(r *raft)
for 循环监听 raft 状态机吐出的数据,并进行对应的处理
func (n *node) run(r *raft) {
//准备各种变量
var propc chan pb.Message
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var rd Ready
lead := None
//TODO: softState到底是个啥?
prevSoftSt := r.softState()
prevHardSt := emptyState
//开始无限循环,无限监听
for {
if advancec != nil {
readyc = nil
} else {
//构建 raft 状态机 已经协商好的数据
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
//如果有更新,就将readyc 赋值为 node.readyc,后面写数据要用
readyc = n.readyc
} else {
//如果有更新,就将readyc置为nil,这样后面就不会写数据到 node.readyc,就不会有数据输出
//不过这样通过把 readyc 置为nil 来控制不输出数据的写法,有点怪怪的
readyc = nil
}
}
//r.lead 比 lead 更新,如果不相等,说明 raft 状态机的 lead 有变动
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
//把 propc 赋值为 node.proc,读取出需要处理的消息
propc = n.propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
//丢失lead,则把proc置为nil,不再处理消息
propc = nil
}
//更新lead
lead = r.lead
}
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case m := <-propc:
//有状态机需要处理的消息,直接调用 raft.Step(m pb.Message)
m.From = r.id
r.Step(m)
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error
}
case cc := <-n.confc:
//节点配置发生变化,需要变更对应的progress
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
}
break
}
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default:
panic("unexpected conf type")
}
select {
//把最新的状态吐出去
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
}
case <-n.tickc:
//到点执行周期任务 Leader-tickHeartbeat; Candidate && follower-tickElecion
r.tick()
case readyc <- rd:
//这就是上面提到的 有更新时 readyc不为nil, 更新数据rd会写入 readyc
//这样系统外就能收到这个更新信息
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
//r.msgs表示同步给其他节点的信息,这边传到系统外之后就要置为nil
r.msgs = nil
//同上,读取数据的时候使用,置为nil
r.readStates = nil
//advancec不为nil,下一个for循环就会执行 <-advancec
advancec = n.advancec
case <-advancec:
//更新raftlog,将“协商中”的数据,更新为 “协商一致”
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
}
if havePrevLastUnstablei {
//压缩空间,移除已协商一致的数据
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
//advancec为nil,下一个for循环就不会执行 <-advancec
advancec = nil
case c := <-n.status:
//丢状态信息到系统外
c <- getStatus(r)
case <-n.stop:
//下掉节点
close(n.done)
return
}
}
}
三、流程 - 选举
当 Follower 和 Candidates 到点之后,会开始触发选举,具体步骤如下
- Follower && Candidate 调用 raft.tickElection()
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
//发一条 pb.MsgHub 的消息到状态机
//Hub 译义是 “行进时为使步伐一致的吆喝声”
//在整个同步集群中,这个词就显得很有意思,因为很贴切,
//发出这么一个类型的消息,就是为了让集群内的节点在某个关系上达到一致,或数据一致(步伐一致)
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
- Follower && Candidate 调用 raft.Step(pb.MsgHup)
func (r *raft) Step(m pb.Message) error {
...
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
//如果开启preVote,会先进行选举前的校验,即判断节点个数是否足够,具体就是
//1.将状态置为 StatePreCandidate,
//2.然后给其他节点发 pb.MsgPreVote,其他节点如果有反馈,则计数+1
//3.一直到 发起选举的节点,收到了 节点数/2 + 1 的反馈,该节点会进入 StateCandidate, 就可以进行选举了
//这样做是为了防止脑裂时,节点数不够还进行无效选举, 导致 Item 不断增大
r.campaign(campaignPreElection)
} else {
//直接发起选举
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
...
}
...
return nil
}
- Follower && Candidate 调用 raft.campaign(campaignElection)
这边直接选举,不再介绍 campaignPreElection 的流程
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
//预选举的流程
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
//选举的流程
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//挨个节点发 MsgPreVote 或 MsgVote
for id := range r.prs {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
- Candidate 调用 raft.send(voteMsg)
func (r *raft) send(m pb.Message) {
...
//把消息写到 raft.msgs,再写到 ready chan,最后由 传输层 丢给其他节点
//参考 “一、4. node.run(r *raft)”
r.msgs = append(r.msgs, m)
}
- 其他节点 调用 node.step(context.Context, voteMsg)
func (n *node) step(ctx context.Context, m pb.Message) error {
//把消息写到 node.recvc,然后node的 run 会再次消费到该消息,然后调用 raft.step
//参考 “一、4. node.run(r *raft)”
ch := n.recvc
if m.Type == pb.MsgProp {
ch = n.propc
}
select {
case ch <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
- 其他节点 调用 raft.step(voteMsg)
func (r *raft) Step(m pb.Message) error {
...
switch m.Type {
...
case pb.MsgVote, pb.MsgPreVote:
//如果是Learner,不参与投票
if r.isLearner {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
return nil
}
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
//回给发送方一条 MsgPreVoteResp 或 MsgVoteResp
//发送操作逻辑同上面的 “4. Follower && Candidate 调用 raft.send(preVoteMsg / voteMsg)”
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
...
}
...
return nil
}
- Candidate 调用 stepCandidate(*raft, voteMsgResp)
func stepCandidate(r *raft, m pb.Message) {
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
...
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
//如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
switch r.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
//节点本身切为 Leader
r.becomeLeader()
//开始向其他节点分发提案
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
...
}
}
- 其他节点 调用 Step(任意pb.Message)
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
...
switch {
...
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
//更新Leader,节点本身变成 Follower
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
...
}
}
四、流程 - put(k, v)
代码路径如下
- Leader/Follower 调用 Propose(ctx context.Context, data []byte)
func (n *node) Propose(ctx context.Context, data []byte) error {
//一样是写 MspProp 消息到 raft 状态机,然后写到 node.propc,再由node.run 的 for循环处理
//最终调用 raft.Step
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
- 2.1 Follower 调用 stepFollower(r *raft, pb.MsgProp)
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return
}
//会转发给 Leader
m.To = r.lead
r.send(m)
...
}
}
- 2.2 Leader 调用 stepLeader(r *raft, pb.MsgProp)
func stepLeader(r *raft, m pb.Message) {
switch m.Type {
...
case pb.MsgProp:
...
//把提案追加到“协商中的提案”中
r.appendEntry(m.Entries...)
//把上述的操作广播出去
//因为需要Follower表示OK,Leader才能commit
r.bcastAppend()
return
...
}
}
func (r *raft) appendEntry(es ...pb.Entry) {
//取最新的Index
//先取 “协商中的提案” 的最新Index,raftLog.unstable.entries[最后一个] --> raftlog.unstable.snapshot.LastIndex()
//如果没有,则取 “已协商一致的提案”的最新Index,raftLog.storage.LastIndex()
li := r.raftLog.lastIndex()
//挨个递增+1赋值Index
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
//追加到“协商中的提案”中
r.raftLog.append(es...)
//1.“代表Leader的Progress”中的match 更新为 最新的Index,
//match表示已发送,由于Leader本身不用发送,所以就直接更新了
//2.方法名为嘛多了maybe前缀呢,因为有可能更新不成功,
//当 lastIndex <= match 时,就更新不成功,
//因为raft library 的机制只允许offset往前走
r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
//提交,即 raftLog.committed 住前进
//1.如果投票结果少于 (节点数/2 +1 ),就提交不了,所以叫 maybeCommit
//2.理论上只有单节点的时候才会提交成功,
//多节点,会在响应Follower反馈的地方,再次调用该方法进行提交
//所以这边不用关心返回值
r.maybeCommit()
}
func (r *raft) maybeCommit() bool {
//把所有节点已收到的提案Index放在一个数组里
mis := make(uint64Slice, 0, len(r.prs))
for _, p := range r.prs {
mis = append(mis, p.Match)
}
//降序
sort.Sort(sort.Reverse(mis))
//取中间法定数-1的match
//比如节点是5,法定数就是3
//mci - matchIndex = 降序Match数组[2],即取法定数中match最小的那一个
//如果 mci > raftLog.committed,说明降序Match数组至少有 3个已经满足 mci > raftLog.committed
//这样就可以提交了
mci := mis[r.quorum()-1]
//尝试提交
return r.raftLog.maybeCommit(mci, r.Term)
}
func (r *raft) bcastAppend() {
//挨个progress进行 sendAppend
r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id {
return
}
r.sendAppend(id)
})
}
func (r *raft) sendAppend(to uint64) {
pr := r.getProgress(to)
if pr.IsPaused() {
return
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
//就是上面所说的,同步过程中出现异常,就会重新同步 数据快照
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
//progress 切到 snapshot 状态
pr.becomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
//正常情况是会给 Follower 发 MsgApp 消息
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
//如果是探测状态,发完这一条,就不会再发
//然后收到反馈时,会调用 pr.resume()
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
// 这边就是要发提案消息了
r.send(m)
}
func (r *raft) send(m pb.Message) {
m.From = r.id
...
//追加到 raft.msgs,写到 ready chan,再由 node.run() 消费到,丢给传输层,再丢给其他节点
r.msgs = append(r.msgs, m)
}
- Follwer 调用 stepFollower(*raft, pb.MsgApp)
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
...
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
...
}
}
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
//如果 term 是对的,并且能够提交(提交细节很装箱单 的,不再介绍,自己看一下maybeAppend)
//就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : false
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
//不能提交
//就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : true,带上节点最新的 Index
//Leader 会尝试同步更旧的数据
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
- Leader 调用 stepLeader(*raft, pb.MsgAppResp)
func stepLeader(r *raft, m pb.Message) {
...
pr := r.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return
}
switch m.Type {
case pb.MsgAppResp:
//收到反馈信息,说明该节点是存活的
pr.RecentActive = true
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
//尝试从更旧的数据开始同步
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldc := pr.IsPaused()
//更新节点对应progress的Match及Next
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
//节点能正常接收消息,则从探测状态切为副本状态
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
//副本状态下,需要调整发送的滑动窗口,以备下次发送
pr.ins.freeTo(m.Index)
}
//尝试提交,即将 “协商中的提案” 变更为 “协商一致的提案”
if r.maybeCommit() {
//如果该提案超过法定人数反馈,则表示协商一致,
//所有节点可以进入下一轮数据同步
r.bcastAppend()
} else if oldPaused {
//如果不一致,则没有必要全部进入下一轮数据同步
//但是节点之前如果是Paused, 那之前是可能有很多消息滞留,未发送,那就继续同步,
r.sendAppend(m.From)
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
...
}
}
这边有一个问题,就是 raft.maybeCommit 只是会更新 offset,具体的 []entriy 的落地, 是不会更新的
raftLog.Storage 不在这边操作,而是在应用层
这个操作为何是在应用层做,理论上应该也封装在 raft librar 里才对。
这边抓了 raftexample 的示例代码 etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
...
for {
select {
...
//这边是读取ready chan,消费数据
case rd := <-rc.node.Ready():
//WAL操作,先不用管,后续再介绍 WAL
rc.wal.Save(rd.HardState, rd.Entries)
//如果 ready 里有 snapshopt,就要重新设置 snapshot
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
//rc.raftStorage 与 raftLog.Storage 是同一个对象
//就是在这位置Entries给落地了
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()
...
}
}
}
- Leader 吐数据给应用层
参考 *“一、基础数据 - 4. node.run(r raft)”,通过 ready chan 把数据吐出去
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
//就是这个位置,r.maybeCommit之后,r.raftLog.nextEnts() 就会有数据
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
return rd
}
- Leader 把 committedIndex 以心跳的方式通知 Follower
代码路径是
- (r *raft) tickHeartbeat()
- (r *raft) Step(pb.Message : pb.MsgBeat)
- stepLeader(*raft, pb.Message:pb.MsgBeat)
- (r *raft) bcastHeartbeat()
- (r *raft) sendHeartbeat(to uint64, ctx []byte)
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
commit := min(r.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
//会带上Leader已经 commit 的 Index
Commit: commit,
Context: ctx,
}
r.send(m)
}
- Follower 接收心跳,更新 commitIndex
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
...
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
//Follower 在这边处理 Leader 发过来的心跳包
r.handleHeartbeat(m)
...
}
}
func (r *raft) handleHeartbeat(m pb.Message) {
//这边更新 commitIndex
r.raftLog.commitTo(m.Commit)
//给 Leader 发心跳Feeback
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}