EtcdRaft源码分析(线性一致读)
背景
我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。
想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?
Leader跟Follower并不总是一致的,换句话说Follower会落后Leader的进度。如果没有特别的处理,那么不同的节点读取的结果很可能不一致。
如果Leader被集群孤立,而且其他人已经推举出了新的Leader。而老的Leader还没有察觉到这个变化,他任然觉得还是Leader,但是他的数据已经不可信。如果他还在对外提供服务,那么读取的结果很可能不一致。
EtcdRaft的线性一致读是通过ReadIndex的机制来实现,大致的实现其实很简单,也就是在处理请求之前,会去集群中确认自己权力是否稳固,这样对外提供的服务才够权威。下面我们一起来剖析下Raft是怎么处理的。
接口
type Node interface {
...
// ReadIndex request a read state. The read state will be set in the ready.
// Read state has a read index. Once the application advances further than the read
// index, any linearizable read requests issued before the read request can be
// processed safely. The read state will have the same rctx attached.
ReadIndex(ctx context.Context, rctx []byte) error
...
}
这个接口很怪,返回error,不是我们通常意义理解的查询接口。Golang的世界就是这么奇妙。注定这个接口没有那么简单。下面我们先搞清楚,这个接口是怎么用的。
EtcdServer
EtcdServer是再好不过的例子了。
func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
for {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
...
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
...
}
cancel()
var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctxToSend)
...
}
...
if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(rs.Index):
case <-s.stopping:
return
}
}
...
}
}
当然这是精简过后的代码,只保留了跟ReadIndex相关的逻辑。从这里我们也能看出一些端倪。
- 首先,传入的参数ctxToSend,是一个单调自增的id,没有任何意义,只是用来客户端区分请求只用。
- 其次,会从readStateC里面监听发出的ReadIndex请求Raft是否有了回应。
- 第三,会看下本地已经写入状态机的日志有没有到ReadIndex请求回来的位置,如果没有继续等待,如果有这个方法立即结束。
- 注意没有这个方法整个是个for循环,而且请求id还单调自增,那么执行下来的结果就是会一直保持对Raft状态的监控,一有风吹草动,这边就会提前收到通知。
ReadIndex
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}
可以看到,最终是通过MsgReadIndex的方式对内进行广播。
Follower
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
首先破除第一个问题,不管谁接到ReadIndex的请求,将转发给Leader。
Leader
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}
return nil
}
- 如果当前Leader截至到现在还没有提交任何entry,那么直接返回。
- 下面我们看下不同的ReadOnly选项都是怎么实现的。
ReadOnlySafe
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
ctx := string(m.Entries[0].Data)
if _, ok := ro.pendingReadIndex[ctx]; ok {
return
}
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.readIndexQueue = append(ro.readIndexQueue, ctx)
}
- 首先我们将请求的requestid放入pendingReadIndex中暂存,当然了,如果之前已经请求过了,那么返回,同时会初始化一个readIndexStatus,最后append到readIndexQueue
- 注意这里的readIndexQueue是FIFO的,是严格有序的。
- 其次,request里面保存的只是一个请求的id,用来客户端来区分请求用的。
- 另外一个需要注意的是,这里保存的index是当前Leader的committedindex,也就是最终一致的地方。
- 想象一下,客户端那边在不停的往Raft里面灌数据,那么其他客户端在读取Raft数据的时候,怎么知道哪些数据是形成一致的,可以安全拿出来用的。当然committedindex是重要的指标,代表,包括这个index及之前的数据都是安全的,有保障的。
- 那么怎么拿到这个committedindex,这就是ReadIndex的目的。
bcastHeartbeatWithCtx
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m)
}
将请求的requestid作为心跳的context,发给其他成员。下面我们看下Follower或Candidate接到请求是怎么处理的。
Follower&Candidate
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
- 可以看到二位接到心跳后,并没有针对ReadOnly做特殊处理。做了他们自己平时该做的。
- 回忆下心跳篇,Leader怎么维持自己的权威,不就是发心跳么?其他成员接受到心跳就认为Leader还存在,还不用重新选举。
- 既然ReadOnly的实现是需要大多数人确认我还是不是真正的Leader,那么Leader在接受心跳响应的时候,看收到的有没有过半数不就可以了么?所以心跳是完美的实现这个的载体。
- 下面我们看下Leader在处理心跳响应的时候,在做什么
Leader
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.resume()
// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() {
pr.ins.freeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
跟心跳篇重复的部分,在这里我们不再赘述
首先到了这里要不你开启了ReadOnlySafe,要不就是消息上下文中没有发现ReadIndex
recvAck
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}
func (ro *readOnly) recvAck(m pb.Message) int {
rs, ok := ro.pendingReadIndex[string(m.Context)]
if !ok {
return 0
}
rs.acks[m.From] = struct{}{}
// add one to include an ack from local node
return len(rs.acks) + 1
}
- 努力回忆下在Leader发心跳前做的准备工作,其中之一就是保存一个客户端的请求id到pendingReadIndex。这里再次提取这个request,将这个心跳响应累加到acks里面。
- 那Leader怎么知道这是普通的心跳响应还是ReadIndex的心跳响应呢?关键就是Message的context是不是有请求id
- 换句话说,这里就是收集因为这次ReadIndex请求发起的心跳,最终有多少人给了回应。
- 如果超过一半的人答复了Leader,说明这个Leader是被人承认的,有公信力的。那么继续往下
advance
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
var (
i int
found bool
)
ctx := string(m.Context)
rss := []*readIndexStatus{}
for _, okctx := range ro.readIndexQueue {
i++
rs, ok := ro.pendingReadIndex[okctx]
if !ok {
panic("cannot find corresponding read state from pending map")
}
rss = append(rss, rs)
if okctx == ctx {
found = true
break
}
}
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}
return nil
}
- 之前在addRequest的时候往ReadOnly里面写了这次请求的信息,那么这里从里面找出来。
- 找到后,将之前的还没来得及处理得请求,一起摘出来,往下处理
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
- 遍历上面返回的请求列表
- 如果是发给当前节点的请求,那么将这个ReadState累加在本地
- 如果不是,给对方发MsgReadIndexResp
Follower
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
可以看到,Follower是通过Leader拿到最新的commit,尽管他自己都还没有跟上对方的进度。但是对于外部请求方来说,他并不区分当前是Follower或Leader,他只想知道当前Raft的最新状态。所以将最新得commit累加到本地得ReadState,等待发送出去。
总结
事已至此,最新的readstate都已经保存在本地了,那么这些状态怎么发送出去,让应用层处理,那就是Ready在做的事情了。