ChainMaker 数据存储

2021-08-19  本文已影响0人  冰冰大象

带着问题看源码
一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
二:群里有人问怎么查历史数据

ChainMaker 采用类似数据库的预写式日志(Write-Ahead Logging (WAL))

引用官方文档:如果区块正在提交过程中,节点因异常退出,节点在下次启动时存储模块会进入恢复流程:
分别从Block binary log、Block DB、State DB、History DB、Result DB中获取最新的区块高度,以Block binary log中的区块高度作为基准高度,判断其他DB是否落后基准高度。
如果有某个DB落后基准高度,则从Block bianry log中获取缺失的区块及读写集,依次提交到落后DB中。
所有DB同步到基准高度后,存储模块启动完成,节点进入正常流程。

先看下当共识完成后提交区块的代码

#######位于module/core/common/block_helper.go
func (chain *BlockCommitterImpl) AddBlock(block *commonpb.Block) (err error) {
    defer func() {
      ...
//出错后,回滚
        if sqlErr := chain.storeHelper.RollBack(block, chain.blockchainStore); sqlErr != nil {
            chain.log.Errorf("block [%d] rollback sql failed: %s", block.Header.BlockHeight, sqlErr)
        }
    }()
    height := block.Header.BlockHeight
    lastProposed, rwSetMap, conEventMap := chain.proposalCache.GetProposedBlock(block)
    ...
// 节点提交
    dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent, blockInfo, err := chain.commonCommit.CommitBlock(
        lastProposed, rwSetMap, conEventMap)
    if err != nil {
        chain.log.Errorf("block common commit failed: %s, blockHeight: (%d)",
            err.Error(), lastProposed.Header.BlockHeight)
    }
    txRetry := chain.syncWithTxPool(lastProposed, height)
     // 清空交易队列
    chain.txPool.RetryAndRemoveTxs(txRetry, lastProposed.Txs)
     // 清空新高度的提案区块缓存
    chain.proposalCache.ClearProposedBlockAt(height)
    // synchronize new block height to consensus and sync module
    chain.msgBus.PublishSafe(msgbus.BlockInfo, blockInfo)
...
    return nil
}
#### module/core/common/committer.go
func (cb *CommitBlock) CommitBlock(
    ...
      // 落块开始
    if err = cb.store.PutBlock(block, rwSet); err != nil {
        // if put db error, then panic
        cb.log.Error(err)
        panic(err)
    }
   ...
}

开始落块

#### module/store/blockstore_impl.go
// PutBlock commits the block and the corresponding rwsets in an atomic operation
func (bs *BlockStoreImpl) PutBlock(block *commonPb.Block, txRWSets []*commonPb.TxRWSet) error {
    startPutBlock := utils.CurrentTimeMillisSeconds()
    //1. commit log
    blockWithRWSet := &storePb.BlockWithRWSet{
        Block:    block,
        TxRWSets: txRWSets,
    }
// 序列化 读写集
    blockBytes, blockWithSerializedInfo, err := serialization.SerializeBlock(blockWithRWSet)
    if err != nil {
    ...
    }
    elapsedMarshalBlockAndRWSet := utils.CurrentTimeMillisSeconds() - startPutBlock
    startCommitLogDB := utils.CurrentTimeMillisSeconds()
    //这里开始写binlog了
    err = bs.writeLog(uint64(block.Header.BlockHeight), blockBytes)
    elapsedCommitlogDB := utils.CurrentTimeMillisSeconds() - startCommitLogDB
    if err != nil {
    ...
    }

    //commit db concurrently
    startCommitBlock := utils.CurrentTimeMillisSeconds()
    //the amount of commit db work
    numBatches := 5
    var batchWG sync.WaitGroup
    batchWG.Add(numBatches)
    errsChan := make(chan error, numBatches)
    // 开始存区块 
    // 2.commit blockDB
    go func() {
        defer batchWG.Done()
        bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.blockDB.CommitBlock)
    }()

    // 3.commit stateDB
    go func() {
        defer batchWG.Done()
        bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.stateDB.CommitBlock)
    }()
// 这个历史数据库 一会去看下怎么存储的
    // 4.commit historyDB
    if !bs.storeConfig.DisableHistoryDB {
        go func() {
            defer batchWG.Done()
            bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.historyDB.CommitBlock)
        }()
    } else {
        batchWG.Done()
    }
    //5. result db
    if !bs.storeConfig.DisableResultDB {
        go func() {
            defer batchWG.Done()
            bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.resultDB.CommitBlock)
        }()
    } else {
        batchWG.Done()
    }
    //6.commit contractEventDB
    if !bs.storeConfig.DisableContractEventDB {
        go func() {
            defer batchWG.Done()
            bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.contractEventDB.CommitBlock)
        }()
    } else {
        batchWG.Done()
    }

    batchWG.Wait()
    if len(errsChan) > 0 {
        return <-errsChan
    }
    elapsedCommitBlock := utils.CurrentTimeMillisSeconds() - startCommitBlock

    //7. clean wal, delete block and rwset after commit
    go func() {
        err := bs.deleteBlockFromLog(uint64(block.Header.BlockHeight))
        if err != nil {
            bs.logger.Warnf("chain[%s]: failed to clean log, block[%d], err:%s",
                block.Header.ChainId, block.Header.BlockHeight, err)
        }
    }()
    ...
    return nil
}

bs.writeLog(uint64(block.Header.BlockHeight), blockBytes) 写日志文件 看看都有些啥

#### module/store/blockstore_impl.go
func (bs *BlockStoreImpl) writeLog(blockHeight uint64, bytes []byte) error {
    // wal log, index increase from 1, while blockHeight increase form 0
    return bs.wal.Write(blockHeight+1, bytes)
}

看一下wal 是什么

// BlockStoreImpl provides an implementation of `protocol.BlockchainStore`.
type BlockStoreImpl struct {
    ...
    wal             binlog.BinLoger
    ...
}
##### 看一下BinLoger 一个接口
type BinLoger interface {
    // 关闭 
    Close() error
   // 清空index之前的数据 
    TruncateFront(index uint64) error
   // 读数据
    Read(index uint64) (data []byte, err error)
   // 获取最后一个标识
    LastIndex() (index uint64, err error)
  // 写数据
    Write(index uint64, data []byte) error
}
##### 继续看谁实现了这个接口
// NewBlockStoreImpl constructs new `BlockStoreImpl`
func NewBlockStoreImpl(...) (*BlockStoreImpl, error) {
    walPath := filepath.Join(storeConfig.StorePath, chainId, logPath)
    writeAsync := storeConfig.LogDBWriteAsync
    walOpt := &wal.Options{
        NoSync: writeAsync,
    }
    if binLog == nil {
        writeLog, err := wal.Open(walPath, walOpt)
        if err != nil {
            panic(fmt.Sprintf("open wal failed, path:%s, error:%s", walPath, err))
        }
        binLog = writeLog
    }
      ...
    blockStore := &BlockStoreImpl{
        ...
        wal:              binLog,
        ...
    }
    //binlog 有SavePoint,不是空数据库,进行数据恢复
    if i, errbs := blockStore.getLastSavepoint(); errbs == nil && i > 0 {
        //check savepoint and recover 开始同步数据
        errbs = blockStore.recover()
        if errbs != nil {
            return nil, errbs
        }
    } else {
        logger.Info("binlog is empty, don't need recover")
    }
    return blockStore, nil
}

这里看到 是 wal.Open产生的binLog对象 实现BinLoger 接口,然后发现wal功能其实是使用了https://github.com/tidwall/wal 在看一下 同步方法blockStore.recover()

// recover checks savepoint and recommit lost block
func (bs *BlockStoreImpl) recover() error {
   ...
   // 读出日志最后高度
    if logSavepoint, err = bs.getLastSavepoint(); err != nil {
        return err
    }
  // 以下是分别读出各数据库的高度
    if blockSavepoint, err = bs.blockDB.GetLastSavepoint(); err != nil {
        return err
    }
    ...
    //recommit blockdb 将数据库中的高度与日志高度进行比较,并将缺失的进行补齐 以下都是一样的,
    if err := bs.recoverBlockDB(blockSavepoint, logSavepoint); err != nil {
        return err
    }
    ...
}

我们以一个同步数据方法recoverBlockDB为例,其他数据库逻辑是一样的

func (bs *BlockStoreImpl) recoverBlockDB(currentHeight uint64, savePoint uint64) error {
// 计算出缺失的高度
    height := bs.calculateRecoverHeight(currentHeight, savePoint)
    for ; height <= savePoint; height++ {
        bs.logger.Infof("[BlockDB] recommitting lost blocks, blockNum=%d, lastBlockNum=%d", height, savePoint)
    //  从日志里捞数据
        blockWithSerializedInfo, err := bs.getBlockFromLog(height)
        if err != nil {
            return err
        }
 //  补齐到缺失的数据库中
        err = bs.blockDB.CommitBlock(blockWithSerializedInfo)
        if err != nil {
            return err
        }
    }
    return nil
}

此时第一个问题 可以回答
一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
答:采用wal 机制 现将数据落到日志,在通过日志同一落到各数据库中,当出现异常退出时,重启后 各数据库会向日志库同步数据 保证各数据库数据一致

如何查看历史数据,在historyDB里发现一个接口

// HistoryDB provides handle to rwSets instances
type HistoryDB interface {
    InitGenesis(genesisBlock *serialization.BlockWithSerializedInfo) error
    // CommitBlock commits the block rwsets in an atomic operation
    CommitBlock(blockInfo *serialization.BlockWithSerializedInfo) error

    //GetHistoryForKey 获得Key的交易历史
    GetHistoryForKey(contractName string, key []byte) (HistoryIterator, error)
    GetAccountTxHistory(account []byte) (HistoryIterator, error)
    GetContractTxHistory(contractName string) (HistoryIterator, error)
    // GetLastSavepoint returns the last block height
    GetLastSavepoint() (uint64, error)

    // Close is used to close database
    Close()
}

但是在go sdk 中并没有发现相对应的方法,所以目前查询历史交易的方法并没有暴露出来。
于是自己写一个访问历史数据库代码 引入chainmaker-go

package main

import (
    "chainmaker.org/chainmaker-go/localconf"
    "chainmaker.org/chainmaker-go/protocol/test"
    "chainmaker.org/chainmaker-go/store"
    "fmt"
    "os"
    "path/filepath"
    "time"
)
func getlvldbConfig(path string) *localconf.StorageConfig {
    conf := &localconf.StorageConfig{}
    if path == "" {
        path = filepath.Join(os.TempDir(), fmt.Sprintf("%d", time.Now().Nanosecond()))
    }
    conf.StorePath = path

    lvlConfig := &localconf.LevelDbConfig{
        StorePath: path,
    }
    dbConfig := &localconf.DbConfig{
        Provider:      "leveldb",
        LevelDbConfig: lvlConfig,
    }
    conf.BlockDbConfig = dbConfig
    conf.StateDbConfig = dbConfig
    conf.HistoryDbConfig = dbConfig
    conf.ResultDbConfig = dbConfig
    conf.DisableContractEventDB = true
    return conf
}
var log = &test.GoLogger{}
func main()  {
    var factory store.Factory
    s, err := factory.NewStore("c1628498288425", getlvldbConfig("/Users/sunbo/Desktop/ChainMaker/src/baas-api/nodeconfigs/c1628498288425-wx-org1-chainmaker-org-node1/data/history"),  log)
    if err != nil {
        panic(err)
    }
    //"fact_json","name" 注意这里的Key是Key和前缀 采用#号连接  
    datas,err:= s.GetHistoryForKey("test",[]byte("factjson#name"))
    if err!=nil{
        panic(err)
    }
    for datas.Next(){
       keys,err:=datas.Value()
        if err!=nil{
            panic(err)
        }
        fmt.Println(keys)
    }
    fmt.Println(s)
}

二:群里有人问怎么查历史数据
答:ChainMaker 本身支持一个HistoryDB 数据库来存储历史数据,但是目前版本不支持,后续版本官方会支持,不过 目前自己先做个版本支持一下

添加历史查询功能 https://www.jianshu.com/p/1a37fa9cbcd2

上一篇下一篇

猜你喜欢

热点阅读