MVCC在etcd中的实现
主流数据库都支持 mvcc 概念,etcd 也支持。以前经常使用 etcd, 但仅限当配置或是状态通知,根本不了解细节,先看下 mvcc 如何实现的吧。
版本概念
对同一个 key 每次修改都对应 revision (翻译成版本?修订?), key 在生命周期内可能被频繁删除,从创建到删除的所有 revision 集合组成一个 generation (翻译成代?), 每个 key 周时有多个 generation 组成多版本。组织这个多版本 key 的结构体叫做 keyIndex
┌───────────────────────────────────────────────────────────────────────────┐
│ ┌───────────┬───────────┬─────────────┐ │
│ │ []byte │ revision │[]generation │ keyIndex │
│ │ key │ modified │ generations │ │
│ └───────────┴──────┬────┴──────┬──────┘ │
│ ┌────────────────────┘ │ │
│ │ │ │
│ revis▼on ▼ │
│ ┌──────┬───────┐ ┌──────────────┬──────────────┐ │
│ │ main │ sub │ │ generation0 │ generation1 │ **** │
│ └──────┴───────┘ └───────┬──────┴──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────┬────────────┬─────────┐ │
│ │ ver │ created │ revs │ generation │
│ └──────┴───────┬────┴────┬────┘ │
│ ┌────────────────┘ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┬──────────────┐ │
│ │ revision │ │ revision │ revision │ **** │
│ └──────────────┘ └──────────────┴──────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
先看下最重要的 revision 结构体,main 表示当前操作的事务 id,全局自增的逻辑时间戳,sub 表示当前操作在事务内部的子 id,事务内自增,从 0 开始。比如一个事务内: put key value1, delete key, 那么分别对应 {txid, 0}, {txid, 1}
// generation contains multiple revisions of a key.
type generation struct {
ver int64 // generation 期间自增
created revision // when the generation is created (put in first revision).
revs []revision
}
mvcc 里不可能让所有版本都追加到一个 revs 数组中,会不限彭胀,所以产生了 generation 代的概念,ver 表示代内操作的顺序,从 0 自增,created 代表创建这个 generation 的第一个版本,最后 revs 表示所有版本。
type keyIndex struct {
key []byte // 用户 key 名称
modified revision // the main rev of the last modification 最新修改的版本
generations []generation // 不同代的数组
}
key 代表当前操作的用户 key, modified 代表最新修改的版本,generations 是代的数组,generations[n-1] 代表最新操作的代。
版本存储
对于数据存储分两点:
- 版本信息: 由 key 和 revision 组成的版本信息,存储在内存的 btree 中,用于快速查找
- kv数据: 真实的 kv 数据存放在 boltdb 中,key 是 revision, value 是序列化后的 pb
btree
版本存储在 treeIndex 中,实际上就是一个 btree,和 mysql 的比较像,点查和范围查都非常快。
type treeIndex struct {
sync.RWMutex
tree *btree.BTree
lg *zap.Logger
}
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(ti.lg, rev.main, rev.sub)
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
}
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
}
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}
可以看到操作比较简单,没什么难度
boltdb
Btree 的版本信息,只包括用户真实 key 和 revision,底层 backend 才是真正保存 kv 的地方,但这个 kv 并不是用户的,而分别是 revision 和 kv pb,参考 storeTxnWrite 的实现
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
......
}
可以看到 key 是 ibytes 也就是 revision, value 是 mvccpb.KeyValue,里面存放当前操作的用户 kv,revision, lease 信息。到此,对 etcd mvcc 信息有一定的了解了,内存 btree 保存 key 的版本信息,而 boltdb 存打平的 mvccpb.KeyValue
完整流程
etcd 代码有点绕,接口太多。整体来讲 mvcc 要分几方面:增删改查,一致性,过期数据清理。先看完整读写,整体入口接口是 applierV3
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
}
}
txn = a.s.KV().Write()
defer txn.End()
}
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
return resp, nil
}
先不管 lease, 实际上就是调用 txn.Put(p.Key, val, leaseID), 而 txn 是由 txn = a.s.KV().Write(),这里面有点绕,实际上是 kvstore_txn
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
其实这就是上面讲过的代码
-
rev := tw.beginRev + 1版本 id 就是当前 +1,开启新的事务 -
kvindex.Get查看是否有当前 key 的 index - 生成
ibytes数据,这里看到 main 就是当前 rev, sub 就是本次事务的更改条目 - 生成
mvccpb.KeyValue信息,并调用UnsafeSeqPut写入boltdb,调用kvindex.Put写内存 btree - 最后关于
lease的先不管。
这里面一定要注意 UnsafeSeqPut 只是开启了这个务,但是并没有提交,底层利用了 boltdb.batch 原理。 所以什么时候 commit 呢?实际上这是异步的,要追遡到 boltdb 初始化。
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
}
bopts.InitialMmapSize = bcfg.mmapSize()
bopts.FreelistType = bcfg.BackendFreelistType
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
if bcfg.Logger != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
}
}
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: bcfg.Logger,
}
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
创建 batchTx 后,马上调用 go b.run() 开启异步 goroutine,用于定期 commit 数据,时间间隔 batchInterval 默认 100ms, 也就是说 mvcc 写入不是那么严格,有一定时间窗口的。
过期数据清理
首先是正常的数据删除,applierV3 主动调用 DeleteRange,最终还是操作的 storeTxnWrite
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev++
}
keys, _ := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for _, key := range keys {
tw.delete(key)
}
return int64(len(keys))
}
首先从 kvindex 也就是 btree 中范围查找到所有的 keys,然后再调用 tw.delete 依次删除
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
}
tw.changes = append(tw.changes, kv)
item := lease.LeaseItem{Key: string(key)}
leaseID := tw.s.le.GetLease(item)
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("cannot detach %v", err)
}
}
}
}
- 生成 ibytes, 然后追加一个
appendMarkTombstone标记,表示这个 revision 是 delete,并且生成一个只含有 key 的mvccpb.KeyValue -
UnsafeSeqPut将本次删除 revision 写到boltdb - btree
调用Tombstone`,增加一个 tombstone revision -
lease操作,暂时忽略不看。
到此看到的删除都是惰性删除,只是标记,那物理数据何时 purge 呢?实际上 etcd 会开启后台 goroutine 自动 compaction, 或是手工用 cli 方式触发。查看代码,其实也只是删 btree,如果 keyi.compact 压缩后发现数据为空,调用 ti.tree.Delete(keyi)
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()
clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
ti.Unlock()
return true
})
return available
}
那么磁盘的 boltdb 数据什么时候删呢?其实是不删的,每次 Compact 时 scheduleCompaction 调用 boltdb 的删除函数,但是我们知道 boltdb 只是标记而己不释放磁盘空间。当然最后还是有碎片整理的,也就是 Defrag, 看代码也好理解,就是新建 boltdb 底层文件,然后全量导数据,最后 rename 替换。