2020-04-07 本文已影响0人
type Node interface {
// Ready returns a channel that returns the current point-in-time state.
// Users of the Node must call Advance after retrieving the state returned by Ready.
// NOTE: No committed entries from the next Ready may be applied until all committed entries
// and snapshots from the previous one have finished.
Ready() <-chan Ready
func (n *node) Ready() <-chan Ready { return n.readyc }
- 外部都是调用Ready拿到通道,取感知内部的变化。
- 那么谁会往readyc去发送状态变更呢?继续往下
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
func (n *node) run(r *raft) {
var readyc chan Ready
var rd Ready
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
- node启动的时候就会开始监控Ready,这里有个技巧是advancec,感兴趣的可以自己理解。
- 不管如何,每次都会去newReady,也就是收集当前node的状态
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
- 首先是Ready,一是unstable里面的日志,二是还没有apply的commit的日志,三是累计的Raft间的消息
- SoftState,当前的leader以及当前节点的身份
- HardState,当前任期,当前投票给谁,当前的committedindex
- unstable里面的snapshot快照
- ReadState,这部分请参看EtcdRaft源码分析(线性一致读)
- MustSync,待分析
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
func (rd Ready) containsUpdates() bool {
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
- 状态收集完后,那么很显然要判断这次收集的东西真的有变化么?
- 这里就是解决这个问题,用来决定要不要通知外部世界
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
r.msgs = nil
r.readStates = nil
advancec = n.advancec
- 这里就不啰嗦了,全写在里面,无非就是将这次更新的关键节点保存下来
- 这里有个相当重要的需要注意的地方。applyingToI,很是值得单独拿出来讲。他的作用是记录应用层这次按照我推送的日志预计会apply到哪里。
- 既然commit的部分推送给应用层了,那么当然要reduceUncommittedSize
func (rd Ready) appliedCursor() uint64 {
if n := len(rd.CommittedEntries); n > 0 {
return rd.CommittedEntries[n-1].Index
if index := rd.Snapshot.Metadata.Index; index > 0 {
return index
return 0
- CommittedEntries好理解,既然都已经达成一致了,当然是想要应用层拿去用。为什么这里还要加上Snapshot,这里的逻辑将限制应用层再advance前,必须将Snapshot写入状态机。下面我们会印证这里的猜测。
- 什么叫应用层apply?怎样才算apply to its state machine?从fabric的角度来说,就是将block写入本地block文件中。
- 这个概念很重要,因为这个指标代表应用层状态机写入到什么位置了。
- 下面我们看下应用层是怎么处理Raft推送的Ready的。
case rd := <-n.Ready():
if err :=, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
// skip empty apply
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
- 下面是标准的用法
- 如果有快照,通知snapC
- 如果有CommittedEntries,通知applyC
- Advance,这里后面会讲
- send(rd.Messages),看过前面篇章的就知道,Raft的通讯层需要应用层代劳,所以集群的节点间消息来来回回都需要借助应用层的力量。
- 其他的,就不再赘述了,有兴趣的可以去看fabric的etcd部分。
- 这里我们重点看下快照的部分是不是印证了我们之前的猜测。
case sn := <-c.snapC:
if sn.Metadata.Index <= c.appliedIndex {
c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
b := utils.UnmarshalBlockOrPanic(sn.Data)
c.lastSnapBlockNum = b.Header.Number
c.confState = sn.Metadata.ConfState
c.appliedIndex = sn.Metadata.Index
if err := c.catchUp(sn); err != nil {
c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
sn.Metadata.Term, sn.Metadata.Index, err)
- 初看基本可以确定,因为他在拆解block,而且将appliedIndex设置为快照的index
- 进去看下catchUp,确认下
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
b, err := utils.UnmarshalBlock(snap.Data)
if err != nil {
return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
if c.lastBlock.Header.Number >= b.Header.Number {
c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
return nil
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
defer puller.Close()
var block *common.Block
next := c.lastBlock.Header.Number + 1
c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)
for next <= b.Header.Number {
block = puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
if utils.IsConfigBlock(block) {, nil)
} else {, nil)
c.lastBlock = block
c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
return nil
看不懂没关系,你只要看到里面在执行, nil)就够了。说明快照进来,不是简单的写入本地snap文件就收工了,是要同时入状态机的。
case rd := <-n.Ready():
if err :=, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
// skip empty apply
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
- 考虑一个问题,Raft在推送Ready给应用层的时候,会记录预计应用层会写入状态机到什么位置,还记得么?
- 那么Raft怎么保证,应用层真的会按预期来行事,如果没有写到预计的位置,不是天下大乱。所以Raft提供了一个回调方法,提醒Raft说,应用层已经处理完毕你推送的日志。当然,这是君子协定。
- 下面我再回到Raft的世界,来看看,收到这个提醒后,是怎么处理的?
case <-advancec:
if applyingToI != 0 {
applyingToI = 0
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
advancec = nil
- 收到就代表应用层已经处理完毕,当然你的raftlog的applied的位置要变为applyingToI
- 如果之前unstable有东西,因为应用层已经写入存储了,当然这部分就可以删掉了。不然为什么叫unstable
- 快照的部分也是如此,应用层都已经写入状态机了,当然这里继续存在也没有意义了。