超级账本HyperLeder

Fabric 1.4 raft 共识源码浅析

2019-12-23  本文已影响0人  小蜗牛爬楼梯

�fabric 在 1.4.1 版本正式引入 Raft 共识算法,用于替代现有的 Kafka 共识。fabric 中的 Raft 实现基于 etcd/raft 库,其中 etcd/raft 作为底层的 raft 状态机,fabric raft 作为上层的应用端,并负责消息通讯和数据存储。本文将对 fabric raft 的实现源码进行简单和阅读分析,理解 raft 共识在 fabric 中是如何运作的。

Orderer 共识接口

orderer 的共识模块主要由两个接口定义:ConsenterChainConsenter 接口定义了根据传入资源创建 Chain 实例的方法,Chain 则定义了orderer 服务所需要的排序功能接口。

Consenter

type Consenter interface {  
    HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

Consenter 接口只定义了一个方法 HandleChain,根据传入的两个参数构建通道管理实例 Chain

  1. support ConsenterSupport :是当前通道用于账本管理相关的支撑实例,提供给当前共识组件用于资源管理。

    ConsenterSupport 也是一个接口,其定义如下:

type ConsenterSupport interface {
  crypto.LocalSigner
  msgprocessor.Processor
  VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error
  BlockCutter() blockcutter.Receiver
  SharedConfig() channelconfig.Orderer
  CreateNextBlock(messages []*cb.Envelope) *cb.Block
  Block(number uint64) *cb.Block
  WriteBlock(block *cb.Block, encodedMetadataValue []byte)
  WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)
  Sequence() uint64
  ChainID() string
  Height() uint64
  IsSystemChannel() bool
  Append(block *cb.Block) error
}
  1. metadata *cb.Metadata:是跟共识组件相关的一些配置信息,比如 raft 的节点列表、当前节点ID 等。初始创建通道时,参数为 nil

Chain

type Chain interface {
 Order(env *cb.Envelope, configSeq uint64) error
 Configure(config *cb.Envelope, configSeq uint64) error
 WaitReady() error
 Errored() <-chan struct{}
 Start()
 Halt()
 MigrationStatus() migration.Status
}

Chain 接口定义了 Orderer 需要对接收到的消息所做的处理。其中最主要的功能是 Order()Configure()Start()Order() 负责对普通交易消息进行处理排序,Configure() 负责对配置交易消息进行处理和排序。当 Orderer 服务在 BroadCast 接口收到消息进行校验和过滤之后,就交由对应 Chain 实例进行处理。Start() 则负责启动此 Chain 服务。

etcdraft 共识实现

etcdraft 模块就是以 raft 的作为共识算法对上述接口的实现,相关源码位于 fabric/orderer/consensus/etcdraft。其中consensus.go 中实现了 Consensus 接口,Chain.go 中实现了 Chain 接口。与 Raft 算法相关的核心逻辑则位于 chain.gonode.go 这两个源码文件中。

etcdraft.Chain 数据结构

etcdraft 中对 consensus.Chain 接口进行实现的 Chain 结构体属性较多,我们挑选其中最主要的一些进行介绍。

// Chain implements consensus.Chain interface.
type Chain struct {
  ...

    rpc RPC
    submitC  chan *submit
    applyC   chan apply
    snapC    chan *raftpb.Snapshot // Signal to catch up with snapshot
    support consensus.ConsenterSupport
    Node *node
    
    ...
}

Submit 方法

etcdraft.Chain 是对 consensus.Chain 接口的实现,所以对外提供的功能主要在 OrderConfigure 两个方法。从源码中看到,这两个方法都实际调用了 Submit 方法进行处理:

func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
...
 leadC := make(chan uint64, 1)
 select {
 case c.submitC <- &submit{req, leadC}:
 lead := <-leadC
 if lead == raft.None {
 c.Metrics.ProposalFailures.Add(1)
 return errors.Errorf("no Raft leader")
 }

 if lead != c.raftID {
 if err := c.rpc.SendSubmit(lead, req); err != nil {
 c.Metrics.ProposalFailures.Add(1)
 return err
 }
 }
...
 }
 return nil
}

Submit 首先将请求消息封装为 submit 结构通过当前 Chain实例的通道 c.submitC 传递给后端处理(下一节分析如何处理),同时获取当前时刻 raft 集群的 leader 信息。

这里对 leader 的不同状态进行了不同处理:

也就是说,所有的应用端发送给 orderer 的 broadcast 请求报文,都会被转发给 raft 集群中的 leader 节点进行处理,如果没有 leader 则返回错误信息。

serveRequest 方法

Chain 中对消息的具体处理逻辑主要在 serveRequest 方法,所有的消息都会发送到里,包括应用端的请求消息和 raft 节点间的消息。

serverRequest 的主体是一个 for select 的循环:

for {
 select {
 case s := <-submitC:
 ...
 case app := <-c.applyC:
 ...
 case <-timer.C():
 ...
 case sn := <-c.snapC:
 ...
 case <-c.doneC:
 ...
 }
}

case submitC

这里的 submitC 会在合适的时机被赋值为 c.submitC,即当前 Chain 实例的 submitC,也是上一节中 submit 方法传递请求消息的那个通道。

submitC 中传递的是 submit 结构的数据,主体是 SubmitRequest 的数据结构:

type SubmitRequest struct {
 Channel string 
 Payload *common.Envelope 
 ...
}

从 submitC 通道收到数据后,首先进行 leader 信息的检查,同时恢复给 submit 信息中的 s.leader,让请求端及时知晓 leader 状态。如果现在没有 leader 还处于竞选状态或者当前节点不是 leader 则跳过,通过上一节分析可知,此时消息被转发到 leader 节点进行处理,本节点忽略。

如果本节点是 leader,则开始处理,首先通过 c.orderd() 方法对消息进行排序,分为两种情况:

经过 c.ordered 处理之后,会得到由 BlockCutter 返回的数据包 bathches (可打包成块的数据)和缓存是否还有数据的信息。如果缓存还有余留数据未出块,则启动计时器,否则重置计时器,这里的计时器由case timer.C 处理。

拿到数据包后,调用 c.propose 处理。propose 会根据 batches 数据包打包出 block ,并将 block 通过调用 c.Node.Propose 将数据传递给底层 raft 状态机。如果是配置信息,还需要标记处当前正在进行配置更新的状态。

从这一段的逻辑可以看出,所有客户端发送给 orderer 的请求,都会被发送给 raft 的leader 节点,由 leader 节点排序并生成区块,生成好的区块发送给底层 raft 状态机进行应用同步。

case c.applyC

c.apply 通道负责接收由底层 raft 状态机处理好抛出给应用层的各种消息,数据来源于 c.Node 从底层 raft 状态机的 Ready 通道接收的数据。

apply 通道收到的 apply 数据包含两部分:

type apply struct {
 entries []raftpb.Entry
 soft    *raft.SoftState
}

1. 状态切换

首先从 soft 中拿到这批消息对应的 leader 信息,判断状态机是否发生了 leader 变化(newLeader != soft.Lead),如果发生了变化则上层需要进行状态切换,有两种情况:

**becomeLeader **- 切换 leader 做了以下几件事:

becomeFollower - 切换 follower 主要做了以下几件事:

2. 数据应用

在 leader 状态切换相关操作完成后,则开始调用 c.apply 对 raft 的 Entry 消息进行应用处理。针对普通消息和配置更新消息这两种类型,处理过程略有不同:

如果累积接收到的 block 数据达到了 SnapshotIntervalSize 的限制,则发送 gc 信号,让状态机开始准备快照。

3. 状态应用收尾

如果之前处于 leader 切换的状态中,那么此时切换已经完成,将相关状态恢复,submitC = c.submitC 重新开始接收 sumit 请求;

如果是还有配置更新过程没有完成,则submitC = nil ,暂停接收 sumit 请求;

如果目前待处理的数据已经消息最大待处理的限制,同样 submitC = c.submitC,恢复接收 submit 请求;

case timer.C

这个 timer 是用于对 orderer 中排序缓存中余留数据的计时,当排序缓存中存在余留数据超过 timer 时间还未被处理,则此计时器触发 BlockCutter.Cut() 动作,将缓存中余留数据出块。

这个 timer 的计时参数来自 orderer 出块配置参数 BatchTimeout

case c.snapC

当底层状态机准备好快照数据后,发送到此 channel。如果当前快照的 index 落后于当前节点应用的 index,说明快照数据过时了,忽略;否则需要逐个追赶上错过的 block (通过 deliver 接口从其他节点请求 block 数据),并逐个应用,写入 orderer 账本。

一个快照中,包含一个 block 数据。

上面就是 fabric中,raft 的主要的逻辑,这一层与底层 raft 状态机之间,还有一个封装的 node 作为桥梁,先关源码在 node.go 中。raft 底层状态机则使用的开源库 go.etcd.io/etcd/raft。如果要进一步了解 fabric 中 raft 共识的流程,则还需要对两部分进行深入分析,本文暂不涉及。

源码分析图(粗略)

image
上一篇 下一篇

猜你喜欢

热点阅读