LevelDbLevelDB

Leveldb之并发写入

2017-08-24  本文已影响61人  ieasy_tm

这篇文章想分析下Leveldb的写入流程以及Leveldb是如何处理并发写入问题的。阅读https://github.com/syndtr/goleveldb代码,从代码的逻辑看,每一次put操作都会new一个Batch对象,Batch的数据结构如下

func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
    b := new(Batch)
    b.Put(key, value)
    return db.Write(b, wo)
}
type Batch struct {
    data       []byte
    rLen, bLen int
    seq        uint64
    sync       bool
}

Batch中的data存放的是key和value, 然后数据长度: bLen, 这个k-v对的序列号: seq。

put.png

Put进来的key-value对写入Batch之后。key-value对将写到.log文件后立马写入到内存中memdb中。这个过程中涉及到将memdb中的写入sstable中。sstablememdb落盘后的磁盘存储结构。每次的write操作都有可能触发将immemdb落地操作:

func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
    ...
    // Set batch first seq number relative from last seq.
    b.seq = db.seq + 1

    // Write journal concurrently if it is large enough.
    if b.size() >= (128 << 10) {
        // Push the write batch to the journal writer
        select {
        case db.journalC <- b:
            // Write into memdb
            if berr := b.memReplay(mdb.DB); berr != nil {
                panic(berr)
            }
        case err = <-db.compPerErrC:
            return
        case _, _ = <-db.closeC:
            err = ErrClosed
            return
        }
        // Wait for journal writer
        select {
        case err = <-db.journalAckC:
            if err != nil {
                // Revert memdb if error detected
                if berr := b.revertMemReplay(mdb.DB); berr != nil {
                    panic(berr)
                }
                return
            }
        case _, _ = <-db.closeC:
            err = ErrClosed
            return
        }
    } else {
        err = db.writeJournal(b)
        if err != nil {
            return
        }
        if berr := b.memReplay(mdb.DB); berr != nil {
            panic(berr)
        }
    }

    // Set last seq number.
    db.addSeq(uint64(b.Len()))

    if b.size() >= mdbFree {
        db.rotateMem(0)
    }
    return
}
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
    delayed := false
    flush := func() (retry bool) {
        v := db.s.version()
        defer v.release()
        mdb = db.getEffectiveMem()
        defer func() {
            if retry {
                mdb.decref()
                mdb = nil
            }
        }()
        mdbFree = mdb.Free()
        switch {
        case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
            delayed = true
            time.Sleep(time.Millisecond)
        case mdbFree >= n:
            return false
        case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
            delayed = true
            err = db.compSendIdle(db.tcompCmdC)
            if err != nil {
                return false
            }
        default:
            // Allow memdb to grow if it has no entry.
            if mdb.Len() == 0 {
                mdbFree = n
            } else {
                mdb.decref()
                mdb, err = db.rotateMem(n)
                if err == nil {
                    mdbFree = mdb.Free()
                } else {
                    mdbFree = 0
                }
            }
            return false
        }
        return true
    }
    start := time.Now()
    for flush() {
    }
    if delayed {
        db.writeDelay += time.Since(start)
        db.writeDelayN++
    } else if db.writeDelayN > 0 {
        db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
        db.writeDelay = 0
        db.writeDelayN = 0
    }
    return
}
上一篇 下一篇

猜你喜欢

热点阅读