超级账本HyperLeder

EtcdRaft源码分析(Ready)

2020-04-07  本文已影响0人  小蜗牛爬楼梯

讲了这么久,我们内部这么动荡,外部还一无所知,那么怎么让他们知道?或者这么讲,是什么样的机制让外部世界能感知Raft内部状态的变更。这一篇让我们解决这个疑问。

接口

type Node interface {
    ...
   // Ready returns a channel that returns the current point-in-time state.
   // Users of the Node must call Advance after retrieving the state returned by Ready.
   //
   // NOTE: No committed entries from the next Ready may be applied until all committed entries
   // and snapshots from the previous one have finished.
   Ready() <-chan Ready

   ...
}

func (n *node) Ready() <-chan Ready { return n.readyc }

  • 外部都是调用Ready拿到通道,取感知内部的变化。
  • 那么谁会往readyc去发送状态变更呢?继续往下

Ready

type Ready struct {
   // The current volatile state of a Node.
   // SoftState will be nil if there is no update.
   // It is not required to consume or store SoftState.
   *SoftState

   // The current state of a Node to be saved to stable storage BEFORE
   // Messages are sent.
   // HardState will be equal to empty state if there is no update.
   pb.HardState

   // ReadStates can be used for node to serve linearizable read requests locally
   // when its applied index is greater than the index in ReadState.
   // Note that the readState will be returned when raft receives msgReadIndex.
   // The returned is only valid for the request that requested to read.
   ReadStates []ReadState

   // Entries specifies entries to be saved to stable storage BEFORE
   // Messages are sent.
   Entries []pb.Entry

   // Snapshot specifies the snapshot to be saved to stable storage.
   Snapshot pb.Snapshot

   // CommittedEntries specifies entries to be committed to a
   // store/state-machine. These have previously been committed to stable
   // store.
   CommittedEntries []pb.Entry

   // Messages specifies outbound messages to be sent AFTER Entries are
   // committed to stable storage.
   // If it contains a MsgSnap message, the application MUST report back to raft
   // when the snapshot has been received or has failed by calling ReportSnapshot.
   Messages []pb.Message

   // MustSync indicates whether the HardState and Entries must be synchronously
   // written to disk or if an asynchronous write is permissible.
   MustSync bool
}

这里反映了Raft内部全部的变化。都体现在Ready的结构体里面。看过前面篇章的朋友,基本上都知道这些字段是干嘛用的,这里就不再赘述。

启动

func (n *node) run(r *raft) {
    ...
   var readyc chan Ready
   var rd Ready

   lead := None
   prevSoftSt := r.softState()
   prevHardSt := emptyState
    ...
   for {
      if advancec != nil {
         readyc = nil
      } else {
         rd = newReady(r, prevSoftSt, prevHardSt)
         if rd.containsUpdates() {
            readyc = n.readyc
         } else {
            readyc = nil
         }
      }
      ...
   }
}

  • node启动的时候就会开始监控Ready,这里有个技巧是advancec,感兴趣的可以自己理解。
  • 不管如何,每次都会去newReady,也就是收集当前node的状态

newReady

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
   rd := Ready{
      Entries:          r.raftLog.unstableEntries(),
      CommittedEntries: r.raftLog.nextEnts(),
      Messages:         r.msgs,
   }
   if softSt := r.softState(); !softSt.equal(prevSoftSt) {
      rd.SoftState = softSt
   }
   if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
      rd.HardState = hardSt
   }
   if r.raftLog.unstable.snapshot != nil {
      rd.Snapshot = *r.raftLog.unstable.snapshot
   }
   if len(r.readStates) != 0 {
      rd.ReadStates = r.readStates
   }
   rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
   return rd
}

  • 首先是Ready,一是unstable里面的日志,二是还没有apply的commit的日志,三是累计的Raft间的消息
  • SoftState,当前的leader以及当前节点的身份
  • HardState,当前任期,当前投票给谁,当前的committedindex
  • unstable里面的snapshot快照
  • ReadState,这部分请参看EtcdRaft源码分析(线性一致读)
  • MustSync,待分析

containsUpdates

if rd.containsUpdates() {
    readyc = n.readyc
} else {
    readyc = nil
}

func (rd Ready) containsUpdates() bool {
   return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
      !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
      len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}

  • 状态收集完后,那么很显然要判断这次收集的东西真的有变化么?
  • 这里就是解决这个问题,用来决定要不要通知外部世界

readyc

case readyc <- rd:
   if rd.SoftState != nil {
      prevSoftSt = rd.SoftState
   }
   if len(rd.Entries) > 0 {
      prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
      prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
      havePrevLastUnstablei = true
   }
   if !IsEmptyHardState(rd.HardState) {
      prevHardSt = rd.HardState
   }
   if !IsEmptySnap(rd.Snapshot) {
      prevSnapi = rd.Snapshot.Metadata.Index
   }
   if index := rd.appliedCursor(); index != 0 {
      applyingToI = index
   }

   r.msgs = nil
   r.readStates = nil
   r.reduceUncommittedSize(rd.CommittedEntries)
   advancec = n.advancec

  • 这里就不啰嗦了,全写在里面,无非就是将这次更新的关键节点保存下来
  • 这里有个相当重要的需要注意的地方。applyingToI,很是值得单独拿出来讲。他的作用是记录应用层这次按照我推送的日志预计会apply到哪里。
  • 既然commit的部分推送给应用层了,那么当然要reduceUncommittedSize

applyingToI

func (rd Ready) appliedCursor() uint64 {
   if n := len(rd.CommittedEntries); n > 0 {
      return rd.CommittedEntries[n-1].Index
   }
   if index := rd.Snapshot.Metadata.Index; index > 0 {
      return index
   }
   return 0
}

  • CommittedEntries好理解,既然都已经达成一致了,当然是想要应用层拿去用。为什么这里还要加上Snapshot,这里的逻辑将限制应用层再advance前,必须将Snapshot写入状态机。下面我们会印证这里的猜测。
  • 什么叫应用层apply?怎样才算apply to its state machine?从fabric的角度来说,就是将block写入本地block文件中。
  • 这个概念很重要,因为这个指标代表应用层状态机写入到什么位置了。
  • 下面我们看下应用层是怎么处理Raft推送的Ready的。

应用层

case rd := <-n.Ready():
   if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
      n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
   }

   if !raft.IsEmptySnap(rd.Snapshot) {
      n.chain.snapC <- &rd.Snapshot
   }

   // skip empty apply
   if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
      n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
   }

   n.Advance()

   // TODO(jay_guo) leader can write to disk in parallel with replicating
   // to the followers and them writing to their disks. Check 10.2.1 in thesis
   n.send(rd.Messages)

  • n.storage.Store这里并不叫写入状态机,它只是写入本地的存储体系,持久到本地,以便异常时恢复节点的状态。写入内存+snap文件+wal文件的组合。
  • 下面是标准的用法
    • 如果有快照,通知snapC
    • 如果有CommittedEntries,通知applyC
    • Advance,这里后面会讲
    • send(rd.Messages),看过前面篇章的就知道,Raft的通讯层需要应用层代劳,所以集群的节点间消息来来回回都需要借助应用层的力量。
  • 其他的,就不再赘述了,有兴趣的可以去看fabric的etcd部分。
  • 这里我们重点看下快照的部分是不是印证了我们之前的猜测。

snapC

case sn := <-c.snapC:
   if sn.Metadata.Index <= c.appliedIndex {
      c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
      break
   }

   b := utils.UnmarshalBlockOrPanic(sn.Data)
   c.lastSnapBlockNum = b.Header.Number
   c.confState = sn.Metadata.ConfState
   c.appliedIndex = sn.Metadata.Index

   if err := c.catchUp(sn); err != nil {
      c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
         sn.Metadata.Term, sn.Metadata.Index, err)
   }

  • 初看基本可以确定,因为他在拆解block,而且将appliedIndex设置为快照的index
  • 进去看下catchUp,确认下
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
   b, err := utils.UnmarshalBlock(snap.Data)
   if err != nil {
      return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
   }

   if c.lastBlock.Header.Number >= b.Header.Number {
      c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
      return nil
   }

   puller, err := c.createPuller()
   if err != nil {
      return errors.Errorf("failed to create block puller: %s", err)
   }
   defer puller.Close()

   var block *common.Block
   next := c.lastBlock.Header.Number + 1

   c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)

   for next <= b.Header.Number {
      block = puller.PullBlock(next)
      if block == nil {
         return errors.Errorf("failed to fetch block %d from cluster", next)
      }
      if utils.IsConfigBlock(block) {
         c.support.WriteConfigBlock(block, nil)
      } else {
         c.support.WriteBlock(block, nil)
      }

      next++
   }

   c.lastBlock = block
   c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
   return nil
}

看不懂没关系,你只要看到里面在执行c.support.WriteBlock(block, nil)就够了。说明快照进来,不是简单的写入本地snap文件就收工了,是要同时入状态机的。

Advance

case rd := <-n.Ready():
   if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
      n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
   }

   if !raft.IsEmptySnap(rd.Snapshot) {
      n.chain.snapC <- &rd.Snapshot
   }

   // skip empty apply
   if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
      n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
   }

   n.Advance()

   // TODO(jay_guo) leader can write to disk in parallel with replicating
   // to the followers and them writing to their disks. Check 10.2.1 in thesis
   n.send(rd.Messages)

在回顾下,日志处理完后和消息发送前,会调用Advance,是要给Raft什么提醒么?

  • 考虑一个问题,Raft在推送Ready给应用层的时候,会记录预计应用层会写入状态机到什么位置,还记得么?
  • 那么Raft怎么保证,应用层真的会按预期来行事,如果没有写到预计的位置,不是天下大乱。所以Raft提供了一个回调方法,提醒Raft说,应用层已经处理完毕你推送的日志。当然,这是君子协定。
  • 下面我再回到Raft的世界,来看看,收到这个提醒后,是怎么处理的?

Raft

case <-advancec:
   if applyingToI != 0 {
      r.raftLog.appliedTo(applyingToI)
      applyingToI = 0
   }
   if havePrevLastUnstablei {
      r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
      havePrevLastUnstablei = false
   }
   r.raftLog.stableSnapTo(prevSnapi)
   advancec = nil

基本上你可以认为,这里就在清理战场

  • 收到就代表应用层已经处理完毕,当然你的raftlog的applied的位置要变为applyingToI
  • 如果之前unstable有东西,因为应用层已经写入存储了,当然这部分就可以删掉了。不然为什么叫unstable
  • 快照的部分也是如此,应用层都已经写入状态机了,当然这里继续存在也没有意义了。

作者:Pillar_Zhong
链接:https://www.jianshu.com/p/0565e6c75125
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

上一篇 下一篇

猜你喜欢

热点阅读