Leveldb之并发写入
2017-08-24 本文已影响61人
ieasy_tm
这篇文章想分析下Leveldb的写入流程以及Leveldb是如何处理并发写入问题的。阅读https://github.com/syndtr/goleveldb
代码,从代码的逻辑看,每一次put
操作都会new
一个Batch
对象,Batch
的数据结构如下
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
b := new(Batch)
b.Put(key, value)
return db.Write(b, wo)
}
type Batch struct {
data []byte
rLen, bLen int
seq uint64
sync bool
}
Batch
中的data
存放的是key和value, 然后数据长度: bLen, 这个k-v对的序列号: seq。
Put进来的key-value
对写入Batch
之后。key-value
对将写到.log
文件后立马写入到内存中memdb
中。这个过程中涉及到将memdb
中的写入sstable
中。sstable
是memdb
落盘后的磁盘存储结构。每次的write操作都有可能触发将immemdb
落地操作:
func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
...
// Set batch first seq number relative from last seq.
b.seq = db.seq + 1
// Write journal concurrently if it is large enough.
if b.size() >= (128 << 10) {
// Push the write batch to the journal writer
select {
case db.journalC <- b:
// Write into memdb
if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
case err = <-db.compPerErrC:
return
case _, _ = <-db.closeC:
err = ErrClosed
return
}
// Wait for journal writer
select {
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr)
}
return
}
case _, _ = <-db.closeC:
err = ErrClosed
return
}
} else {
err = db.writeJournal(b)
if err != nil {
return
}
if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
}
// Set last seq number.
db.addSeq(uint64(b.Len()))
if b.size() >= mdbFree {
db.rotateMem(0)
}
return
}
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
mdb = db.getEffectiveMem()
defer func() {
if retry {
mdb.decref()
mdb = nil
}
}()
mdbFree = mdb.Free()
switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case mdbFree >= n:
return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true
err = db.compSendIdle(db.tcompCmdC)
if err != nil {
return false
}
default:
// Allow memdb to grow if it has no entry.
if mdb.Len() == 0 {
mdbFree = n
} else {
mdb.decref()
mdb, err = db.rotateMem(n)
if err == nil {
mdbFree = mdb.Free()
} else {
mdbFree = 0
}
}
return false
}
return true
}
start := time.Now()
for flush() {
}
if delayed {
db.writeDelay += time.Since(start)
db.writeDelayN++
} else if db.writeDelayN > 0 {
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
db.writeDelay = 0
db.writeDelayN = 0
}
return
}