分布式

Raft在etcd中的实现(一)存储及重要的组件

2018-03-30  本文已影响139人  yuan1028

raftexample简介

本文中raft的实现以etcd中raftexample实现为例。
raftexample的例子中,启动了一个kv数据库作为raft的state machine,另外启动了一个http数据库来响应客户端的请求。其中proposeC以及commitC,errorC主要用来交互,proposeC为由客户端发送过来的条目,通过httpserver流向共识部分raftNode;commitC为经共识的条目,由raftNode流向kv数据库。

/*
github.com/coreos/etcd/contrib/raftexample/main.go
main()
*/
    var kvs *kvstore
    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
        //启动raft节点
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
       //启动kv数据库作为state machine
    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    // the key-value http handler will propose updates to raft
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)

raftexample中的存储

raftexample中的存储分为以下几部分

几个重要的组件

etcd中raft的实现了很好的模块化,其中raft共识模块主要是实现了算法的逻辑,而系统需要用到的存储、通讯等模块都从共识模块中很好的剥离了出来是单独模块的实现。这样做可以很方便对raft的共识模块进行移植,也可以很方便地支持多种底层的存储或者通讯方式。raftexample中的组件大概可以分为以下几部分。

raftNode组件:主要处理应用层服务

raftNode结构体,实际上是应用层的包装,真正的raft共识部分在其中的node(raft.Node)中。RaftNode结构体中可以放应用层需要的一些东西。大致有以下一些东西

/*
github.com/coreos/etcd/contrib/raftexample/raft.go
*/
// A key-value stream backed by raft
type raftNode struct {
    proposeC    <-chan string            // proposed messages (k,v)
    confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
    commitC     chan<- *string           // entries committed to log (k,v)
    errorC      chan<- error             // errors from raft session

    id          int      // client ID for raft session
    peers       []string // raft peer URLs
    join        bool     // node is joining an existing cluster
    waldir      string   // path to WAL directory
    snapdir     string   // path to snapshot directory
    getSnapshot func() ([]byte, error)
    lastIndex   uint64 // index of log at start

    confState     raftpb.ConfState
    snapshotIndex uint64                   //snapshotIndex
    appliedIndex  uint64                    //同论文中的appliedIndex,用于记录最新的已提交state machine执行的日志的索引

    // raft backing for the commit/error channel
    node        raft.Node                    //真正的共识部分的node
    raftStorage *raft.MemoryStorage          //raft中内存存储日志的部分
    wal         *wal.WAL                     //wal文件部分

    snapshotter      *raftsnap.Snapshotter
    snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready

    snapCount uint64
    transport *rafthttp.Transport
    stopc     chan struct{} // signals proposal channel closed
    httpstopc chan struct{} // signals http server to shutdown
    httpdonec chan struct{} // signals http server shutdown complete
}

大致有以下几种函数

node组件,应用层与共识模块的沟通者

node结构体的主要作用是应用层和共识模块的衔接。将应用层的消息传递给底层共识模块,并将底层共识模块共识后的结果反馈给应用层。结构体的主要成员都是用来作消息传递用处的通道。

type node struct {
    propc      chan pb.Message
    recvc      chan pb.Message
    confc      chan pb.ConfChange
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status

    logger Logger
}

大致函数有几种:

  1. 启动、新建、停止:StartNode,RestartNode,newNode,Stop。
  2. 发送命令给共识模块的函数:Tick,Campaign,Propose,Step,ProposeConfChange,ApplyConfChange,TransferLeadership。
  3. 反馈给应用层的函数Ready,Advance,,Status,ReportUnreachable,ReadIndex。

raft组件:共识组件

raft结构体,共识组件结构体。
首先放一下论文中需要维护的state来对比

currentTerm                   //当前任期   
votedFor                      //当前任期的候选者编号,无则为null
log[]                         //日志条目

//Volatile state on all servers,所有服务器上维护
commitIndex             //已知的最高的可被提交的日志条目的索引,初始为0
lastApplied             //当前已提交给state machine执行的条目的索引,初始为0

//Volatile state on leaders:(Reinitialized after election),只在leader节点上维护
nextIndex[]          //对于每一台服务器,下一条将要发给该服务器的条目的索引,初始为leader最后一条条目索引+1
matchIndex[]         //每一个服务器已知的最高的已复制的条目的索引,初始为0

etcd中raft实现中raft结构体内容,可以看出其主要包含一下几部分

type raft struct {
    //同论文中的state的一些基本项
    id         uint64               //节点id
    Term       uint64               //节点当前所处任期,即currentTerm
    Vote       uint64               // 即votedFor
    raftLog    *raftLog             //节点的日志,即log[]
    prs        map[uint64]*Progress //progress中存储了对应节点的matchIndex和nextIndex
    learnerPrs map[uint64]*Progress

    //一些其他状态
    readStates []ReadState
    state      StateType
    // isLearner is true if the local raft node is a learner.
    isLearner bool
    votes     map[uint64]bool
    msgs      []pb.Message
    readOnly  *readOnly
    lead      uint64 // the leader id
    // leadTransferee is id of the leader transfer target when its value is not zero.
    // Follow the procedure defined in raft thesis 3.10.
    leadTransferee   uint64
    pendingConfIndex uint64
    checkQuorum      bool
    preVote          bool

    //一些其他配置项
    maxInflight               int
    maxMsgSize                uint64
    electionElapsed           int
    heartbeatElapsed          int
    heartbeatTimeout          int
    electionTimeout           int
    randomizedElectionTimeout int
    disableProposalForwarding bool

    tick   func()
    step   stepFunc //按照leader,follower,candidate角色不同,有三个不同的函数
    logger Logger
}

raft结构体及其方法是算法中的主体部分。大致有以下几种函数:

Progress:维护集群中的节点状态
Progress结构体,该结构体主要是在每个节点维护集群中其他节点的状态用。主要包括

type Progress struct {
    Match, Next     uint64
    State           ProgressStateType
    Paused          bool
    PendingSnapshot uint64
    RecentActive    bool
    ins             *inflights
    IsLearner       bool
}

大致有以下几种函数:

raftLog:raft中的日志管理者
raftLog结构体,raft中的日志条目管理的结构体,前文中介绍raft中的日志存储包括文件存储和内存存储两部分。该结构体中storage即为内存存储的日志条目部分。unstable是在日志尚未存储到日志文件时的状态。commited和applied是论文中commitIndex和lastApplied的概念,及最新的已共识可提交的日志条目索引,和已提交给state machine执行的最新条目的索引。

type raftLog struct {
    // storage contains all stable entries since the last snapshot.
    storage Storage

    // unstable contains all unstable entries and snapshot.
    // they will be saved into storage.
    unstable unstable

    // committed is the highest log position that is known to be in
    // stable storage on a quorum of nodes.
    committed uint64
    // applied is the highest log position that the application has
    // been instructed to apply to its state machine.
    // Invariant: applied <= committed
    applied uint64

    logger Logger
}

大致有以下几种函数

Snapshotter组件:快照管理者

type Snapshotter struct {
    dir string
}

大致包含保存快照SaveSnap,读取快照Read,加载快照Load等。

WAL组件:日志文件管理者

WAL结构体,用来管理日志文件。

type WAL struct {
    dir string // the living directory of the underlay files
    // dirFile is a fd for the wal directory for syncing on Rename
    dirFile *os.File
    metadata []byte           // metadata recorded at the head of each WAL
    state    raftpb.HardState // hardstate recorded at the head of WAL
    start     walpb.Snapshot // snapshot to start reading
    decoder   *decoder       // decoder to decode records
    readClose func() error   // closer for decode reader

    mu      sync.Mutex
    enti    uint64   // index of the last entry saved to the wal
    encoder *encoder // encoder to encode records

    locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
    fp    *filePipeline
}

大致有以下几种函数:

Transport组件:消息传输者

Transport组件是负责消息通信的,目前的实现方式是http的实现。

type Transport struct {
    DialTimeout time.Duration // maximum duration before timing out dial of the request
    // DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
    // a distinct rate limiter is created per every peer (default value: 10 events/sec)
    DialRetryFrequency rate.Limit

    TLSInfo transport.TLSInfo // TLS information used when creating connection

    ID          types.ID   // local member ID
    URLs        types.URLs // local peer URLs
    ClusterID   types.ID   // raft cluster ID for request validation
    Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
    Snapshotter *raftsnap.Snapshotter
    ServerStats *stats.ServerStats // used to record general transportation statistics
    // used to record transportation statistics with followers when
    // performing as leader in raft protocol
    LeaderStats *stats.LeaderStats
    // ErrorC is used to report detected critical errors, e.g.,
    // the member has been permanently removed from the cluster
    // When an error is received from ErrorC, user should stop raft state
    // machine and thus stop the Transport.
    ErrorC chan error

    streamRt   http.RoundTripper // roundTripper used by streams
    pipelineRt http.RoundTripper // roundTripper used by pipelines

    mu      sync.RWMutex         // protect the remote and peer map
    remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
    peers   map[types.ID]Peer    // peers map

    prober probing.Prober
}

大致包含以下几种函数:

上一篇 下一篇

猜你喜欢

热点阅读