打造公链

打造公链-造轮子(8)BTCD区块存储ffldb实现

2018-06-12  本文已影响0人  建怀

BTCD区块存储ffldb实现

database这个目录,是BTCD的存储框架,能使得btcd支持多种数据库。
其中ffldb是database包中提供的默认数据库。

database目录主要包含的文件有:

cmd/dbtool

这个目录下面有7个文件

interface.go

接口定义了DB接口,Tx接口,BlockRegion结构,Cursor接口,Bucket接口

type Cursor interface {
    Bucket()    Bucket
    Delete()    error
    First() bool
    Last()  bool
    Next()  bool
    Prev()  bool
    Seek(seek []byte)   bool
    Key()   []byte
    Value() []byte
}

type Bucket interface {
    Bucket(key []byte) Bucket
    CreateBucket(key []byte)(Bucket,error)
    CreateBucketIfNotExists(key []byte)(Bucket,error)
    DeleteBucket(key []byte) error
    ForEach(func(k,v []byte) error) error
    ForEachBucket(func(k []byte) error) error
    Cursor() Cursor
    Writable()  bool
    Put(key,value []byte) error
    Get(key []byte) []byte
    Delete(key []byte) error
}

type BlockRegion struct {
    Hash *chainhash.Hash
    Offset  uint32
    Len uint32
}

type Tx interface {
    Metadata()  Bucket
    StoreBlock(block *btcutil.Block) error
    HasBlock(hash *chainhash.Hash) (bool,error)
    HasBlocks(hashes []chainhash.Hash) ([]bool,error)
    FetchBlockHeader(hash *chainhash.Hash) ([]byte,error)
    FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte,error)
    FetchBlock(hash *chainhash.Hash) ([]byte,error)
    FetchBlocks(hashes []chainhash.Hash) ([][]byte,error)
    FetchBlockRegion(region *BlockRegion) ([]byte,error)
    FetchBlockRegions(regions []BlockRegion) ([][]byte,error)
    Commit() error
    Rollback() error
}

type DB interface {
    Type() string
    Begin(writable bool) (Tx,error)
    View(fn func(tx Tx) error) error
    Update(fn func(tx Tx) error) error
    Close() error
}

ffldb对上面的接口进行了实现,直接看其db的结构定义:

type db struct {
    writeLock sync.Mutex   // Limit to one write transaction at a time.
    closeLock sync.RWMutex // Make database close block while txns active.
    closed    bool         // Is the database closed?
    store     *blockStore  // Handles read/writing blocks to flat files.
    cache     *dbCache     // Cache layer which wraps underlying leveldb DB.
}

各个属性的意义:

db.go实现了database.DB接口,其中各个方法的实现跟BoltDB中基本类似,也是通过View()或者Update()的回调方法回去Tx对象或其引用,
然后调用Tx中接口进行数据库操作,因此就不重点分析db的各个方法实现,本身不是区块链技术的最核心部分,项目对于这一块底层采取拿来即用的策略。
对于Tx的实现,ffldb中transaction的定义如何,如何实现database.Tx接口:

type transaction struct {
    managed        bool             // Is the transaction managed?
    closed         bool             // Is the transaction closed?
    writable       bool             // Is the transaction writable?
    db             *db              // DB instance the tx was created from.
    snapshot       *dbCacheSnapshot // Underlying snapshot for txns.
    metaBucket     *bucket          // The root metadata bucket.
    blockIdxBucket *bucket          // The block index bucket.

    // Blocks that need to be stored on commit.  The pendingBlocks map is
    // kept to allow quick lookups of pending data by block hash.
    pendingBlocks    map[chainhash.Hash]int
    pendingBlockData []pendingBlock

    // Keys that need to be stored or deleted on commit.
    pendingKeys   *treap.Mutable
    pendingRemove *treap.Mutable

    // Active iterators that need to be notified when the pending keys have
    // been updated so the cursors can properly handle updates to the
    // transaction state.
    activeIterLock sync.RWMutex
    activeIters    []*treap.Iterator
}

其中各个属性的意义:

transaction中主要有三大类方法,首先看看db.go中的Metadata()方法:

// Metadata returns the top-most bucket for all metadata storage.
// This function is part of the database.Tx interface implementation.
func (tx *transaction) Metadata() database.Bucket {
    return tx.metaBucket
}

Metaata()方法返回根Bucket,剩下的操作均通过它来进行。只要看看bucket的定义:

// bucket is an internal type used to represent a collection of key/value pairs
// and implements the database.Bucket interface.
type bucket struct {
    tx *transaction
    id [4]byte
}

ffldb利用leveldb来存储K/V对,leveldb底层数据结构为LSM树(log-structed merge-tree),而BoltDB采用B+ Tree.
ffldb利用leveldb提供的接口来读写K/V,而leveldb中没有Bucket的概念,也没有对Key进行分层管理的方法,那ffldb
总要实现Bucket,分析一下源码:

// CreateBucket creates and returns a new nested bucket with the given key.
func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
    // Ensure transaction state is valid.
    ...
    // Ensure the transaction is writable.
    ...
    // Ensure a key was provided.
    ...

    // Ensure bucket does not already exist.
    bidxKey := bucketIndexKey(b.id, key)
    if b.tx.hasKey(bidxKey) {
        str := "bucket already exists"
        return nil, makeDbErr(database.ErrBucketExists, str, nil)
    }

    // Find the appropriate next bucket ID to use for the new bucket.  In the case of the special internal block index, keep the fixed ID.
    var childID [4]byte
    if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
        childID = blockIdxBucketID
    } else {
        var err error
        childID, err = b.tx.nextBucketID()
        if err != nil {
            return nil, err
        }
    }

    // Add the new bucket to the bucket index.
    if err := b.tx.putKey(bidxKey, childID[:]); err != nil {
        str := fmt.Sprintf("failed to create bucket with key %q", key)
        return nil, convertErr(str, err)
    }
    return &bucket{tx: b.tx, id: childID}, nil
}

上面的方法主要是通过key来创建分层的Bucket:

再来看一下bucketIndexKey方法,就能明白Bucket的定义:

func bucketIndexKey(parentID [4]byte, key []byte) []byte {
    // The serialized bucket index key format is:
    //   <bucketindexprefix><parentbucketid><bucketname>
    indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
    copy(indexKey, bucketIndexPrefix)
    copy(indexKey[len(bucketIndexPrefix):], parentID[:])
    copy(indexKey[len(bucketIndexPrefix)+4:], key)
    return indexKey
}

一个子Bucket的Key总是<bucketindexprefix><parentbucketid><bucketname> 这三部分构成,如果一个Bucket的Key是这种形式,那它对应
一个子Bucket,其Value装的就是Bucket的id。从上面的分析可以看出来,ffldb是通过Bucket的Key的分层形式来标记父子关系的。
BoltDB是B+树的形式,子Bucket对应一颗独立的B+树,当向子Bucket中添加K/V时,就是向对应的B+树中插入记录。

ffldb也是需要向子Bucket中添加K/V的,需要分析一下源码:

func (b *bucket) Put(key, value []byte) error {
    // Ensure transaction state is valid.
    ...
    // Ensure the transaction is writable.
    ...
    // Ensure a key was provided.
    ...

    return b.tx.putKey(bucketizedKey(b.id, key), value)
}

很明显,在经过对transaction和key的三个判断之后,直接就是putKey的方法,关键在于对key进行了一个处理,bucketizedKey(b.id, key)

func bucketizedKey(bucketID [4]byte, key []byte) []byte {
    // The serialized block index key format is:
    //   <bucketid><key>
    bKey := make([]byte, 4+len(key))
    copy(bKey, bucketID[:])
    copy(bKey[4:], key)
    return bKey
}

在向bucket中添加K/V时,Key会被转换成 <bucketid><key> 的形式,从而标记这一记录属于id为 bucketid的bucket。

ffldb中有两种Key的分层格式:

两种分层格式,第一种分层格式清晰标记子Bucket的父子层级关系;第二种分层格式清晰标记Bucket中K/V对与Bucket的对应关系。

最终都是要写入leveldb数据库,其实K/V写入的时候并没有Bucket的概念,所有的key都是一种扁平结构。

但还是绑定了bucket,比如抽取数据需要用到的游标cursor,这个游标cursor就跟bucket进行了绑定,其是通过
leveldb中的Iterator实现的。

bucket的Put()方法分析,添加的K/V会通过transaction的PutKey()方法先加入到pendingKeys中

func (tx *transaction) putKey(key, value []byte) error {
    // Prevent the key from being deleted if it was previously scheduled
    // to be deleted on transaction commit.
    tx.pendingRemove.Delete(key)

    // Add the key/value pair to the list to be written on transaction
    // commit.
    tx.pendingKeys.Put(key, value)
    tx.notifyActiveIters()
    return nil
}

同样的逻辑,bucket的Delete()方法也是调用transaction的deleteKey()方法来实现。deleteKey()中,会将
要删除的key添加到pendingRemove中,待transaction Commit最终将pendingKeys添加到leveldb中,pendingRemove中
的Key将从leveldb中删除。

func (c *cursor) Delete() error {
    // Ensure transaction state is valid.
    ...
    // Error if the cursor is exhausted.
    ...
    // Do not allow buckets to be deleted via the cursor.
    ...

    c.bucket.tx.deleteKey(copySlice(key), true)
    return nil
}

func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
    // Remove the key from the list of pendings keys to be written on transaction commit if needed.
    tx.pendingKeys.Delete(key)

    // Add the key to the list to be deleted on transaction commit.
    tx.pendingRemove.Put(key, nil)

    // Notify the active iterators about the change if the flag is set.
    if notifyIterators {
        tx.notifyActiveIters()
    }
}

Get()方法也会最终调用transaction的fetchKey()方法来查询,fetchKey()先从pendingRemove或者pendingKeys
查找,如果找不到再从dbCache的一个快照中查找。

func (b *bucket) Get(key []byte) []byte {
    // Ensure transaction state is valid.
    if err := b.tx.checkClosed(); err != nil {
        return nil
    }

    // Nothing to return if there is no key.
    if len(key) == 0 {
        return nil
    }

    return b.tx.fetchKey(bucketizedKey(b.id, key))
}

func (tx *transaction) fetchKey(key []byte) []byte {
    // When the transaction is writable, check the pending transaction
    // state first.
    if tx.writable {
        if tx.pendingRemove.Has(key) {
            return nil
        }
        if value := tx.pendingKeys.Get(key); value != nil {
            return value
        }
    }

    // Consult the database cache and underlying database.
    return tx.snapshot.Get(key)
}

transaction中第二类是读取Block相关的方法,先来重点分析一下StoreBlock()和FetchBlock()方法:

func (tx *transaction) StoreBlock(block *btcutil.Block) error {
    // Ensure transaction state is valid.
    ...
    // Ensure the transaction is writable.
    ...
    // Reject the block if it already exists.
    ...
    blockBytes, err := block.Bytes()
    ...

    if tx.pendingBlocks == nil {
        tx.pendingBlocks = make(map[chainhash.Hash]int)
    }
    tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
    tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{
        hash:  blockHash,
        bytes: blockBytes,
    })
    log.Tracef("Added block %s to pending blocks", blockHash)

    return nil
}

StoreBlock()主要是先把block放到pendingBlockData数组中,等待Commit时写入文件。

func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
    // Ensure transaction state is valid.
    ...
    // When the block is pending to be written on commit return the bytes from there.
    if idx, exists := tx.pendingBlocks[*hash]; exists {
        return tx.pendingBlockData[idx].bytes, nil
    }

    // Lookup the location of the block in the files from the block index.
    blockRow, err := tx.fetchBlockRow(hash)
    if err != nil {
        return nil, err
    }
    location := deserializeBlockLoc(blockRow)

    // Read the block from the appropriate location.  The function also
    // performs a checksum over the data to detect data corruption.
    blockBytes, err := tx.db.store.readBlock(hash, location)
    if err != nil {
        return nil, err
    }

    return blockBytes, nil
}

读Block时,先从pendingBlocks中查找,如果有则直接从pendingBlockData中返回;如果没有,就需要通过块hash先找到块的位置,
tx.fetchBlockRow(hash) 取到blockRow,反序列化成location,然后拿location和hash取出block。
上面的所谓block的位置,也就是block的metadata,对应的是block的索引。

总结:通过transaction读写数据或者Block,均会先对pendingBlocks或pendingKeys与pendingRemove读写,可以看作transaction的缓冲,
在Commit时被同步到文件或者leveldb中。Commit()最终会调用writePendingAndCommit()进行实际操作。

func (tx *transaction) writePendingAndCommit() error {
    // Save the current block store write position for potential rollback.
    // These variables are only updated here in this function and there can only be one write transaction active at a time, so it's safe to store them for potential rollback.
    wc := tx.db.store.writeCursor
    wc.RLock()
    oldBlkFileNum := wc.curFileNum
    oldBlkOffset := wc.curOffset
    wc.RUnlock()

    // rollback is a closure that is used to rollback all writes to the block files.
    rollback := func() {
        // Rollback any modifications made to the block files if needed.
        tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
    }

    // Loop through all of the pending blocks to store and write them.
    for _, blockData := range tx.pendingBlockData {
        log.Tracef("Storing block %s", blockData.hash)
        location, err := tx.db.store.writeBlock(blockData.bytes)
        if err != nil {
            rollback()
            return err
        }

        // Add a record in the block index for the block.
        // The record includes the location information needed to locate the block on the filesystem as well as the block header since they areso commonly needed.
        blockRow := serializeBlockLoc(location)
        err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
        if err != nil {
            rollback()
            return err
        }
    }

    // Update the metadata for the current write file and offset.
    writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
    if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil {
        rollback()
        return convertErr("failed to store write cursor", err)
    }

    // Atomically update the database cache.  The cache automatically
    // handles flushing to the underlying persistent storage database.
    return tx.db.cache.commitTx(tx)
}

在writePendingAndCommit()中,需要保存当前块的写的位置,以备回滚,当然这些变量只在这个函数中更新,并且一次只能有一个写事务活动,
所以将它们存储在潜在回滚中是安全的。

回滚是一个闭包函数,用于回滚在这个方法中所有已经写入文件的区块。

写文件主要三个方面:

blockStore

transaction中读写元数据或者区块时,最终会通过blockStore读写文件或者dbCache读写树堆或者leveldb。
接下来分析blockStore和dbCache。

看看glockio.go中的blockStore结构定义:

type blockStore struct {
    // 指示当前Block网络类型,比如MainNet,TestNet或者SimNet,在向文件中写入区块时,当然要指定该区块来自哪类网络.
    network wire.BitcoinNet

    // 存储Block的文件在磁盘上的存储路径
    basePath string

    // 存储Block文件的最大的Size
    maxBlockFileSize uint32

    // 对openBlockFiles进行保护的读写锁
    obfMutex         sync.RWMutex
    // 对openBlocksLRU和fileNumTolRUElem进行保护的互斥锁
    lruMutex         sync.Mutex
    // 已打开文件的序号和LRU列表,默认的最大打开文件数是25
    openBlocksLRU    *list.List // Contains uint32 block file numbers.
    // 记录文件序号与openBlocksLRU中元素的对应关系
    fileNumToLRUElem map[uint32]*list.Element
    // 记录所有打开的只读文件的序号与文件指针的对应关系
    openBlockFiles   map[uint32]*lockableFile

    // 指向当前写入的文件,记录其文件序号和写偏移
    writeCursor *writeCursor

    //  openFile、openWriteFile和deleteFile三个方法的接口方法
    openFileFunc      func(fileNum uint32) (*lockableFile, error)
    openWriteFileFunc func(fileNum uint32) (filer, error)
    deleteFileFunc    func(fileNum uint32) error
}

blockStore结构的定义是非常复杂的,通过blockStore的readBlock()和writeBlock()方法来了解其工作机制。

func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte, error) {
    // 根据需要获取所引用的块文件句柄打开文件。该函数还可以根据需要处理关闭文件,以避免越过最大允许打开的文件。
    blockFile, err := s.blockFile(loc.blockFileNum)
    if err != nil {
        return nil, err
    }

    serializedData := make([]byte, loc.blockLen)
    n, err := blockFile.file.ReadAt(serializedData, int64(loc.fileOffset))
    blockFile.RUnlock()
    if err != nil {
        str := fmt.Sprintf("failed to read block %s from file %d, "+
            "offset %d: %v", hash, loc.blockFileNum, loc.fileOffset,
            err)
        return nil, makeDbErr(database.ErrDriverSpecific, str, err)
    }

    // 计算读取数据的校验和,并确保它与序列化的校验和匹配。这将检测到平面文件中的任何数据损坏,而不需要在加载的块上做更昂贵的merkle根计算。
    serializedChecksum := binary.BigEndian.Uint32(serializedData[n-4:])
    calculatedChecksum := crc32.Checksum(serializedData[:n-4], castagnoli)
    if serializedChecksum != calculatedChecksum {
        str := fmt.Sprintf("block data for block %s checksum "+
            "does not match - got %x, want %x", hash,
            calculatedChecksum, serializedChecksum)
        return nil, makeDbErr(database.ErrCorruption, str, nil)
    }

    // 与区块相关联的网络必须与当前的活动网络相匹配,否则某人可能会将区块文件放在目录中错误的网络中。
    serializedNet := byteOrder.Uint32(serializedData[:4])
    if serializedNet != uint32(s.network) {
        str := fmt.Sprintf("block data for block %s is for the "+
            "wrong network - got %d, want %d", hash, serializedNet,
            uint32(s.network))
        return nil, makeDbErr(database.ErrDriverSpecific, str, nil)
    }

    // 原始块不包括网络、块的长度和校验和。
    return serializedData[8 : n-4], nil
}

readBlock()的实现步骤为:

其中最重要的是通过blockFile()得到一个文件句柄:

func (s *blockStore) blockFile(fileNum uint32) (*lockableFile, error) {
    // 当被请求的块文件打开时,返回它。
    wc := s.writeCursor
    wc.RLock()
    if fileNum == wc.curFileNum && wc.curFile.file != nil {
        obf := wc.curFile
        obf.RLock()
        wc.RUnlock()
        return obf, nil
    }
    wc.RUnlock()

    // 尝试在整个文件读取锁下返回一个打开的文件。
    s.obfMutex.RLock()
    if obf, ok := s.openBlockFiles[fileNum]; ok {
        s.lruMutex.Lock()
        s.openBlocksLRU.MoveToFront(s.fileNumToLRUElem[fileNum])
        s.lruMutex.Unlock()

        obf.RLock()
        s.obfMutex.RUnlock()
        return obf, nil
    }
    s.obfMutex.RUnlock()

    // 由于该文件还没有打开,所以需要在写锁下再次检查打开块文件映射,以防多个阅读器在这里出现,而另一个则已经打开了文件。
    s.obfMutex.Lock()
    if obf, ok := s.openBlockFiles[fileNum]; ok {
        obf.RLock()
        s.obfMutex.Unlock()
        return obf, nil
    }

    // 该文件不是打开的,所以打开它,可能会在需要的时候关闭最近使用的文件。
    obf, err := s.openFileFunc(fileNum)
    if err != nil {
        s.obfMutex.Unlock()
        return nil, err
    }
    obf.RLock()
    s.obfMutex.Unlock()
    return obf, nil
}

其实现步骤如下:

openFile()中主要执行:

从openFile()中可以看到,blockStore通过openBlockFiles和openBlocksLRU及fileNumToLRUElem维护了一个已经打开的只读文件的LRU缓存列表,可以加速从文件读取区块。

writeBlock():

func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) {
    // Compute how many bytes will be written.
    // 4 bytes each for block network + 4 bytes for block length +
    // length of raw block + 4 bytes for checksum.
    blockLen := uint32(len(rawBlock))
    fullLen := blockLen + 12

    // Move to the next block file if adding the new block would exceed the
    // max allowed size for the current block file.  Also detect overflow
    // to be paranoid, even though it isn't possible currently, numbers
    // might change in the future to make it possible.
    //
    // NOTE: The writeCursor.offset field isn't protected by the mutex
    // since it's only read/changed during this function which can only be
    // called during a write transaction, of which there can be only one at
    // a time.
    wc := s.writeCursor
    finalOffset := wc.curOffset + fullLen
    if finalOffset < wc.curOffset || finalOffset > s.maxBlockFileSize {
        // This is done under the write cursor lock since the curFileNum
        // field is accessed elsewhere by readers.
        //
        // Close the current write file to force a read-only reopen
        // with LRU tracking.  The close is done under the write lock
        // for the file to prevent it from being closed out from under
        // any readers currently reading from it.
        wc.Lock()
        wc.curFile.Lock()
        if wc.curFile.file != nil {
            _ = wc.curFile.file.Close()
            wc.curFile.file = nil
        }
        wc.curFile.Unlock()

        // Start writes into next file.
        wc.curFileNum++
        wc.curOffset = 0
        wc.Unlock()
    }

    // All writes are done under the write lock for the file to ensure any
    // readers are finished and blocked first.
    wc.curFile.Lock()
    defer wc.curFile.Unlock()

    // Open the current file if needed.  This will typically only be the
    // case when moving to the next file to write to or on initial database
    // load.  However, it might also be the case if rollbacks happened after
    // file writes started during a transaction commit.
    if wc.curFile.file == nil {
        file, err := s.openWriteFileFunc(wc.curFileNum)
        if err != nil {
            return blockLocation{}, err
        }
        wc.curFile.file = file
    }

    // Bitcoin network.
    origOffset := wc.curOffset
    hasher := crc32.New(castagnoli)
    var scratch [4]byte
    byteOrder.PutUint32(scratch[:], uint32(s.network))
    if err := s.writeData(scratch[:], "network"); err != nil {
        return blockLocation{}, err
    }
    _, _ = hasher.Write(scratch[:])

    // Block length.
    byteOrder.PutUint32(scratch[:], blockLen)
    if err := s.writeData(scratch[:], "block length"); err != nil {
        return blockLocation{}, err
    }
    _, _ = hasher.Write(scratch[:])

    // Serialized block.
    if err := s.writeData(rawBlock[:], "block"); err != nil {
        return blockLocation{}, err
    }
    _, _ = hasher.Write(rawBlock)

    // Castagnoli CRC-32 as a checksum of all the previous.
    if err := s.writeData(hasher.Sum(nil), "checksum"); err != nil {
        return blockLocation{}, err
    }

    loc := blockLocation{
        blockFileNum: wc.curFileNum,
        fileOffset:   origOffset,
        blockLen:     fullLen,
    }
    return loc, nil
}

其主要步骤为:

总结:通过readBlock()和writeBlock(),基本上可以了解blockStore的整个工作机制,主要是通过一个LRU列表来管理已经打开的
只读文件,并通过writeCursor来记录当前写入的文件及文件内偏移,在写入区块时,如果写入区块后超过了设置的最大文件Size,
则新建一个文件写入。

dbCache

type dbCache struct {
    // 指向leveldb的DB对象,用于向leveldb中存取K/V
    ldb *leveldb.DB

    // 指向当前db下的blockStore,用于向leveldb中写元数据之前,通过blockStore将区块缓存强制写入磁盘
    store *blockStore

    // 简单地讲,它是缓存的待添加和删除的元数据的总大小限制,默认值为100M;
    maxSize       uint64
    // 向leveldb中写数据的时间间隔;
    flushInterval time.Duration
    // 上次向leveldb中写数据的时间戳;
    lastFlush     time.Time

    // 对cachedKeys和cachedRemove进行读写保护,它们会在dbCache向leveldb写数据时更新,在dbCache快照时被读取;
    cacheLock    sync.RWMutex
    // 缓存待添加的Key,它指向一个树堆;
    cachedKeys   *treap.Immutable
    // 缓存待删除的Key,它也指向一个树堆,请注意,cachedKeys和cachedRemove与transaction中的pendingKeys和pendingRemove有区别
    cachedRemove *treap.Immutable
}

在db.go文件中writePendingAndCommit()方法中看到transaction提交的最后一步是return tx.db.cache.commitTx(tx)
调用dbCache的commitTx()来提交元数据的更新,来看看commitTx():

func (c *dbCache) commitTx(tx *transaction) error {
    // Flush the cache and write the current transaction directly to the
    // database if a flush is needed.
    if c.needsFlush(tx) {
        if err := c.flush(); err != nil {
            return err
        }

        // Perform all leveldb updates using an atomic transaction.
        err := c.commitTreaps(tx.pendingKeys, tx.pendingRemove)
        if err != nil {
            return err
        }

        // Clear the transaction entries since they have been committed.
        tx.pendingKeys = nil
        tx.pendingRemove = nil
        return nil
    }

    // At this point a database flush is not needed, so atomically commit
    // the transaction to the cache.

    // Since the cached keys to be added and removed use an immutable treap,
    // a snapshot is simply obtaining the root of the tree under the lock
    // which is used to atomically swap the root.
    c.cacheLock.RLock()
    newCachedKeys := c.cachedKeys
    newCachedRemove := c.cachedRemove
    c.cacheLock.RUnlock()

    // Apply every key to add in the database transaction to the cache.
    tx.pendingKeys.ForEach(func(k, v []byte) bool {
        newCachedRemove = newCachedRemove.Delete(k)
        newCachedKeys = newCachedKeys.Put(k, v)
        return true
    })
    tx.pendingKeys = nil

    // Apply every key to remove in the database transaction to the cache.
    tx.pendingRemove.ForEach(func(k, v []byte) bool {
        newCachedKeys = newCachedKeys.Delete(k)
        newCachedRemove = newCachedRemove.Put(k, nil)
        return true
    })
    tx.pendingRemove = nil

    // Atomically replace the immutable treaps which hold the cached keys to
    // add and delete.
    c.cacheLock.Lock()
    c.cachedKeys = newCachedKeys
    c.cachedRemove = newCachedRemove
    c.cacheLock.Unlock()
    return nil
}

其中主要的步骤为:

treap

dbCache缓存Key,刷新缓存及读缓存的过程,dbCache中用于实际缓存的数据结构是treap.Immutable,它是dbCache的核心。

btcd中的treap既实现了Immutable版本,同时也提供了Muttable版本。

对于想了解treap的读者,可以先简单读一下《随机平衡二叉查找树Treap的分析与应用》

简单说清楚treap:是tree和head的集合,二叉树与堆的结合,为了防止有序的序列在二叉树上退化成链,即在二叉树的节点中引入一个随机数,
用于对节点进行堆排序,让二叉查找树实现动态平平衡,同事形成最大堆和最小堆,从而保证其平衡性。树堆查找的时间复杂度都是O(logN)。

重点分析treap的Mutable和Immutable的Put()方法来了解treap的构建,添加节点后的旋转及Immutable的写时复制等过程。

type Mutable struct {
    root  *treapNode
    count int

    // totalSize is the best estimate of the total size of of all data in
    // the treap including the keys, values, and node sizes.
    totalSize uint64
}

type Immutable struct {
    root  *treapNode
    count int

    // totalSize is the best estimate of the total size of of all data in
    // the treap including the keys, values, and node sizes.
    totalSize uint64
}

Immutable和Mutable的定义完全一样,区别在于Immutable提供了写时复制,在Put()方法中就能看出其区别。其中的root属性指向treap的根节点

节点的结构为:

type treapNode struct {
    key      []byte
    value    []byte
    priority int
    left     *treapNode
    right    *treapNode
}

treapNode中的key和value就是树堆节点的值,priority是用于构建堆的随机修正值,也叫节点的优先级。left和right分别指向左右子树根节点。

看看Mubtable的Put()方法:

func (t *Mutable) Put(key, value []byte) {
    // Use an empty byte slice for the value when none was provided.  This
    // ultimately allows key existence to be determined from the value since
    // an empty byte slice is distinguishable from nil.
    if value == nil {
        value = emptySlice
    }

    // The node is the root of the tree if there isn't already one.
    if t.root == nil {
        node := newTreapNode(key, value, rand.Int())
        t.count = 1
        t.totalSize = nodeSize(node)
        t.root = node
        return
    }

    // Find the binary tree insertion point and construct a list of parents
    // while doing so.  When the key matches an entry already in the treap,
    // just update its value and return.
    var parents parentStack
    var compareResult int
    for node := t.root; node != nil; {
        parents.Push(node)
        compareResult = bytes.Compare(key, node.key)
        if compareResult < 0 {
            node = node.left
            continue
        }
        if compareResult > 0 {
            node = node.right
            continue
        }

        // The key already exists, so update its value.
        t.totalSize -= uint64(len(node.value))
        t.totalSize += uint64(len(value))
        node.value = value
        return
    }

    // Link the new node into the binary tree in the correct position.
    node := newTreapNode(key, value, rand.Int())
    t.count++
    t.totalSize += nodeSize(node)
    parent := parents.At(0)
    if compareResult < 0 {
        parent.left = node
    } else {
        parent.right = node
    }

    // Perform any rotations needed to maintain the min-heap.
    for parents.Len() > 0 {
        // There is nothing left to do when the node's priority is
        // greater than or equal to its parent's priority.
        parent = parents.Pop()
        if node.priority >= parent.priority {
            break
        }

        // Perform a right rotation if the node is on the left side or
        // a left rotation if the node is on the right side.
        if parent.left == node {
            node.right, parent.left = parent, node.right
        } else {
            node.left, parent.right = parent, node.left
        }
        t.relinkGrandparent(node, parent, parents.At(0))
    }
}

其中的主要步骤:

从Mutable的Put()方法中,我们可以完整地了解treap的构建、插入及涉及到的子树旋转过程。Immutable的Put()与Mutable的Put()实现步骤大致一致,不同的是,Immutable没有直接修改原节点或旋转原树,而是将查找路径上的所有节点均复制一份出来与原树的其它节点一起形成一颗新的树,在新树上进行更新或者旋转后返回新树。

driver.go

type Driver struct {
    DbType  string
    Create func(args ...interface{}) (DB,error)
    Open func(args ...interface{}) (DB,error)
    UseLogger func(logger btclog.Logger)
}

作为一个继承了DB接口的后端来进行注册,包括了属性dbType,然后有创建驱动和打开驱动的接口方法,还有日志记录器

项目代码地址: https://github.com/jianhuaixie/blockchain-buildwheels/tree/master/content/wheels-8

本文借鉴oceanken同学颇多,再次感谢!他的博客地址:https://www.jianshu.com/p/a043c23a5059

上一篇下一篇

猜你喜欢

热点阅读