etcd学习笔记2(草稿)

2020-06-14  本文已影响0人  酱油王0901

etcd初始化流程

etcd启动时首先会调用startEtcdOrProxyV2, 这个方法里首先会进行config的初始化以及解析传入的配置项,然后检查config中的Dir是否为空,如果为空则根据config中指定的Name来生成data dir,默认如下所示,后面
再次启动时会检查data dir的类型,目前有三种:member, proxy, empty,分别代表成员,代理,空。然后进入不同的分支调用startEtcd,或者startProxy

                                                                             +---->startEtcd ---> configurePeerListeners ---> configureClientListeners ----> etcdserver.NewServer
                                                                             |
startEtcdOrProxyV2 ---> newConifg ---> cfg.parse ---> identify data dir ---> |
                                                                             |
                                                                             +---->startProxy

etcd data dir如下:

(ENV) [root@ceph-2 etcd]# ls
10.255.101.74.etcd  10.255.101.74.proxy.etcd  etcd.conf  etcd-proxy.conf
(ENV) [root@ceph-2 etcd]# tree -h
.
├── [  20]  10.255.101.74.etcd
│   └── [  29]  member
│       ├── [ 246]  snap
│       │   ├── [366K]  0000000000000002-0000000000d5a021.snap
│       │   ├── [366K]  0000000000000002-0000000000d726c2.snap
│       │   ├── [366K]  0000000000000002-0000000000d8ad63.snap
│       │   ├── [366K]  0000000000000002-0000000000da3404.snap
│       │   ├── [362K]  0000000000000002-0000000000dbbaa5.snap
│       │   └── [ 20K]  db
│       └── [ 244]  wal
│           ├── [ 61M]  000000000000001e-0000000000c0cf3d.wal
│           ├── [ 61M]  000000000000001f-0000000000c775f1.wal
│           ├── [ 61M]  0000000000000020-0000000000ce234b.wal
│           ├── [ 61M]  0000000000000021-0000000000d4ce84.wal
│           ├── [ 61M]  0000000000000022-0000000000db76f5.wal
│           └── [ 61M]  0.tmp
├── [  19]  10.255.101.74.proxy.etcd
│   └── [  21]  proxy
│       └── [  70]  cluster
├── [3.6K]  etcd.conf
└── [ 558]  etcd-proxy.conf

6 directories, 15 files

启动etcd server时会创建store,如果data dir, wal dir和snap dir不存在则创建, snap/db为backend path。如果db存在的话,则用db构建Backend。构建完成后会启动goroutine执行backend.run()

// file: mvcc/backend/backend.go
type Backend interface {
    // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
    ReadTx() ReadTx
    BatchTx() BatchTx
    // ConcurrentReadTx returns a non-blocking read transaction.
    ConcurrentReadTx() ReadTx

    Snapshot() Snapshot
    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
    // Size returns the current size of the backend physically allocated.
    // The backend can hold DB space that is not utilized at the moment,
    // since it can conduct pre-allocation or spare unused space for recycling.
    // Use SizeInUse() instead for the actual DB size.
    Size() int64
    // SizeInUse returns the current size of the backend logically in use.
    // Since the backend can manage free space in a non-byte unit such as
    // number of pages, the returned value can be not exactly accurate in bytes.
    SizeInUse() int64
    // OpenReadTxN returns the number of currently open read transactions in the backend.
    OpenReadTxN() int64
    Defrag() error
    ForceCommit()
    Close() error
}

接着,新创建Transport

// etcdserver/server.go
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())

WAL

如果WAL目录存在,则会打开所有的wal并检验snapshot entries,其通过decoder来对wal进行解码,decoder结构如下

// wal/decoder.go
type decoder struct {
     mu  sync.Mutex
     brs []*bufio.Reader

     // lastValidOff file offset following the last valid decoded record
     lastValidOff int64
     crc          hash.Hash32
 }

其中brs对应所有的wal文件Reader,分别遍历每个wal文件:

  1. little endian的形式读取wal开头8个字节,例如下面wal文件中开头8个字节为04 00 00 00 00 00 00 84,注意是小端优先序,低56bits代表record字节,值为4; 高8bits的低3位部分代表pad,84的二进制表述为10000100, 低三位的值为4。WAL entry size最大为10MB。每个WAL segment file的默认大小为64MB。
    0000000 04 00 00 00 00 00 00 84 08 04 10 00 00 00 00 00
    0000010 20 00 00 00 00 00 00 00 08 01 10 bf ae e5 db 08
    0000020 1a 16 08 e9 e4 bc b2 8f ba fc 88 82 01 10 c4 cf
    
    var (
        // SegmentSizeBytes is the preallocated size of each wal segment file.
        // The actual size might be larger than this. In general, the default
        // value should be used, but this is defined as an exported variable
        // so that tests can set a different segment size.
        SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
    )
    
  2. 读取record bytes + padding bytes
  3. 将其反序列化为Record,其结构如下,其中包括类型,CRC以及数据,校验时会根据data计算其CRC值,然后与Record中的CRC值进行比较,如果不相等,说明数据已经损坏。
  4. 获取所有Record类型为snapshot且其Index小于Committed hardState
// wal/walpb/record.pb.go
type Record struct {
    Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"`
    Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
    Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
    XXX_unrecognized []byte `json:"-"`
}

如下所示,Record有五种类型

// wal/wal.go
const (
    metadataType int64 = iota + 1
    entryType
    stateType
    crcType
    snapshotType
)
// raft/raftpb/raft.pb.go
type HardState struct {
    Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
    Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
    Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
    XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type Entry struct {
    Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
    Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
    Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
    Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
    XXX_unrecognized []byte    `json:"-"`
}
// wal/walpb/record.pb.go
type Snapshot struct {
    Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
    Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
    XXX_unrecognized []byte `json:"-"`
}
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
    NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
    ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
    XXX_unrecognized []byte `json:"-"`
}`
+----------------------------------------------------------------------+
|  +-------------------------------+---------------------------------+ |
|  |     record bytes<56bits>      | padding <lower 3 bits of 8bits> | |
|  |-----------------------------------------------------------------+ |
|  |                              data                               | |
|  +-----------------------------------------------------------------+ |
|                                  ...                                 |
+----------------------------------------------------------------------+

Snapshot

snap目录下只包含.snap结尾的文件以及db文件。其中每个.snap文件命名格式为%016x-%016x.wal,即seq-index.wal。其对应snappb.Snapshot结构

// etcdserver/api/snap/snappb/snap.pb.go
type Snapshot struct {
    Crc              uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
    Data             []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
    XXX_unrecognized []byte `json:"-"`
}

snappb.Snapshot中的Data又对应raftpb.Snapshot

// raft/raftpb/raft.pb.go
type Snapshot struct {
    Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
    Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
    XXX_unrecognized []byte           `json:"-"`
}
// raft/raftpb/raft.pb.go
type SnapshotMetadata struct {
    ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
    Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
    Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`
    XXX_unrecognized []byte    `json:"-"`
}

启动/重启节点

根据是否存在WAL目录,以及是否是new cluster来判断执行启动节点还是重启节点,下面以重启节点为例进行介绍。

  1. 从WAL中读取 metadataraftpb.HardState以及所有的raftpb.Entry
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
    NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
    ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
    XXX_unrecognized []byte `json:"-"`
}
  1. 创建RaftCluster对象,其中metadata中的NodeIDClusterID分别对应RaftClusterlocalIDcid
// file: etcdserver/api/membership/cluster.go
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
   lg *zap.Logger

   localID types.ID
   cid     types.ID
   token   string

   v2store v2store.Store
   be      backend.Backend

   sync.Mutex // guards the fields below
   version    *semver.Version
   members    map[types.ID]*Member
   // removed contains the ids of removed members in the cluster.
   // removed id cannot be reused.
   removed map[types.ID]bool

   downgradeInfo *DowngradeInfo
}
  1. 创建MemoryStorage
    • Apply snapshot
    • 设置HardState,步骤一中获取的值
    • 将步骤一中获取的entries append到MemoryStorage
// file: raft/storage.go
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
    // Protects access to all fields. Most methods of MemoryStorage are
    // run on the raft goroutine, but Append() is run on an application
    // goroutine.
    sync.Mutex

    hardState pb.HardState
    snapshot  pb.Snapshot
    // ents[i] has raft log position i+snapshot.Metadata.Index
    ents []pb.Entry
}
MemoryStorage

4 . 根据raft.Config配置重启Node
通常建议ElectionTick = 10 * HeartbeatTick,这样可以避免不必要的leader切换。

// file: raft/rawnode.go
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
    raft       *raft
    prevSoftSt *SoftState
    prevHardSt pb.HardState
}
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
    raft       *raft
    prevSoftSt *SoftState
    prevHardSt pb.HardState
}
  1. 新建raft node,其中node为Node接口的标准实现。
// file: raft/node.go
// Node represents a node in a raft cluster.   
type Node interface 
// node is the canonical implementation of the Node interface
type node struct {
    propc      chan msgWithResult
    recvc      chan pb.Message
    confc      chan pb.ConfChangeV2
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status

    rn *RawNode
}
  1. 启动goroutine运行node.run()方法。详见raft/node.go文件。
// file: raft/raft.go
// StateType represents the role of a node in a cluster.
type StateType uint64

var stmap = [...]string{
    "StateFollower",
    "StateCandidate",
    "StateLeader",
    "StatePreCandidate",
}

Transport


Peer

远端raft node通过peer来进行表述,本地raft node通过peer来向远端发送messages,每个peer有两种底层的机制来发送messages,分别为streampipeline

etcd主要采用Stream消息通道和pipeline消息通道,其中Stream消息通道维护HTTP长连接,主要负责数据传输量较小,发送比较频繁的消息,而pipeline消息通道在传输数据完成后会立即关闭连接,主要负责传输数据量较大,发送频率较低的消息,例如传输快照数据。


Handler

/raft  --> pipelineHandler
/raft/stream/ --> streamHandler
/raft/sanpshot --> snapshotHandler
/raft/probing --> httpHealth

Message encoder/decoder

Message的encoder/decoder通过封装io.Writer/Reader,分别对Message进行编码,解码。

+----------------------------------------------------------------------+
|  +-------------------------------+---------------------------------+ |
|  |                    message size (8 bytes)                       | |
|  |-----------------------------------------------------------------+ |
|  |                              data                               | |
|  +-----------------------------------------------------------------+ |
|                                  ...                                 |
+----------------------------------------------------------------------+

编码时先写入8字节的message大小,然后才是序列号过后的数据。
解码正好与之相反,首先读取8字节的message大小,然后判断其是否大于512MB,如果大于则直接返错。如果小于阈值则将其反序列化为Message。也可以通过指定读取的字节大小,例如snapshot信息最大可为1TB。详细见etcdserver/api/rafthttp/msg_codec.go

// messageEncoder is a encoder that can encode all kinds of messages.
// It MUST be used with a paired messageDecoder.
type messageEncoder struct {
    w io.Writer
}

func (enc *messageEncoder) encode(m *raftpb.Message) error {
    if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
        return err
    }
    _, err := enc.w.Write(pbutil.MustMarshal(m))
    return err
}

// messageDecoder is a decoder that can decode all kinds of messages.
type messageDecoder struct {
    r io.Reader
}

var (
    readBytesLimit     uint64 = 512 * 1024 * 1024 // 512 MB
    ErrExceedSizeLimit        = errors.New("rafthttp: error limit exceeded")
)

func (dec *messageDecoder) decode() (raftpb.Message, error) {
    return dec.decodeLimit(readBytesLimit)
}

func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
    var m raftpb.Message
    var l uint64
    if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
        return m, err
    }
    if l > numBytes {
        return m, ErrExceedSizeLimit
    }
    buf := make([]byte, int(l))
    if _, err := io.ReadFull(dec.r, buf); err != nil {
        return m, err
    }
    return m, m.Unmarshal(buf)
}

本文是基于etcd 3.5.0-pre版本。


References

上一篇 下一篇

猜你喜欢

热点阅读