Fabric如何从Ledger里面读取block的内容
2018-08-23 本文已影响525人
CodingCode
Fabric从ledger读取block的例子
下面例子使用fabric的功能,读取ledger的block,包含两个方法:
- GetBlockByNumber:根据block number读取确切的一个block
- GetBlocksIterator:遍历的方式读取所有的block
package main
//
// There is web resource describing the detailed block structure:
// https://blockchain-fabric.blogspot.com/2017/04/hyperledger-fabric-v10-block-structure.html
//
import (
"os"
"fmt"
"strings"
"encoding/base64"
"github.com/hyperledger/fabric/peer/common"
"github.com/hyperledger/fabric/core/ledger/kvledger"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/spf13/viper"
)
const cmdRoot = "core"
const channel = "mychannel"
// TODO: print more block data fields
func printBlock(prefix string, block * cb.Block) {
fmt.Printf("%s Block: Number=[%d], CurrentBlockHash=[%s], PreviousBlockHash=[%s]\n",
prefix,
block.GetHeader().Number,
base64.StdEncoding.EncodeToString(block.GetHeader().DataHash),
base64.StdEncoding.EncodeToString(block.GetHeader().PreviousHash))
}
func main() {
viper.SetEnvPrefix(cmdRoot)
viper.AutomaticEnv()
replacer := strings.NewReplacer(".", "_")
viper.SetEnvKeyReplacer(replacer)
err := common.InitConfig("core")
if err != nil { // Handle errors reading the config file
fmt.Printf("Cannot init configure, error=[%v]", err)
os.Exit(1)
}
provider, err := kvledger.NewProvider() // core/ledger/kvledger/kv_ledger_provider.go
if err != nil {
fmt.Printf("Cannot new provider, error=[%s]", err)
os.Exit(1)
}
defer provider.Close()
// Print channel list
channels, err := provider.List() // core/ledger/kvledger/kv_ledger_provider.go
if err != nil {
fmt.Printf("Cannot get channel list, error=[%v]\n", err)
os.Exit(1)
}
fmt.Printf("channels=[%v]\n", channels)
// Open a channel
ledger, err := provider.Open(channel) // core/ledger/kvledger/kv_ledger_provider.go
if err != nil {
fmt.Printf("Cannot open channel ledger, error=[%v]\n", err)
os.Exit(1)
}
defer ledger.Close()
// Return ledger as kvLedger is defined in core/ledger/kvledger/kv_ledger.go, following API:
// func (l *kvLedger) GetBlockchainInfo() (*common.BlockchainInfo, error)
// func (l *kvLedger) GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
// func (l *kvLedger) GetBlockByNumber(blockNumber uint64) (*common.Block, error)
// func (l *kvLedger) GetBlockByHash(blockHash []byte) (*common.Block, error)
// func (l *kvLedger) GetBlockByTxID(txID string) (*common.Block, error)
// func (l *kvLedger) GetBlocksIterator(startBlockNumber uint64) (commonledger.ResultsIterator, error)
// func (l *kvLedger) GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
// func (l *kvLedger) Close()
// Get basic channel information
chainInfo, err := ledger.GetBlockchainInfo() // (*common.BlockchainInfo, error)
if err != nil {
fmt.Printf("Cannot get block chain info, error=[%v]\n", err)
os.Exit(1)
}
fmt.Printf("chainInfo: Height=[%d], CurrentBlockHash=[%s], PreviousBlockHash=[%s]\n",
chainInfo.GetHeight(),
base64.StdEncoding.EncodeToString(chainInfo.CurrentBlockHash),
base64.StdEncoding.EncodeToString(chainInfo.PreviousBlockHash))
// Retrieve blocks based on block number
for i := uint64(0); i < chainInfo.GetHeight(); i++ {
block, err := ledger.GetBlockByNumber(i) // (blockNumber uint64) (*common.Block, error)
if err != nil {
fmt.Printf("Cannot get block for %d, error=[%v]\n", i, err)
os.Exit(1)
}
printBlock("Get", block)
}
// Retrieve blocks based on iterator
itr, err := ledger.GetBlocksIterator(0) // (ResultsIterator, error)
if err != nil {
fmt.Printf("Cannot get iterator, error=[%v]\n", err)
os.Exit(1)
}
defer itr.Close()
queryResult, err := itr.Next() // commonledger.QueryResult
for i := uint64(0); err == nil; i++ {
block := queryResult.(*cb.Block)
printBlock("Iterator", block)
if i >= chainInfo.GetHeight() - 1 {
break
}
queryResult, err = itr.Next() // commonledger.QueryResult
}
}
使用方法:
- 拷贝core.yaml到当前目录,并
修改fileSystemPath配置值到正确的ledger存储目录: 例如~/.../hyperledger/production - go build && ./main
上述例子代码依赖于peer组织的ledger结构,因为他需要读取ledger的index信息,也就是说输入必须是完整的hyperledger/production目录,这样才能包含完整的ledger信息;其好处就是可以按照block number或者transaction id来搜搜block。
另外一种情况,是如果只有单个ledger文件,能不能读取block信息呢,当然这种情况下只能按顺序遍历读取所有的block而不能随机读取。
代码如下:
package main
import (
"os"
"fmt"
"io"
"io/ioutil"
"bufio"
"errors"
"encoding/base64"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/common"
lutil "github.com/hyperledger/fabric/common/ledger/util"
putil "github.com/hyperledger/fabric/protos/utils"
)
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
var (
file *os.File
fileName string
fileSize int64
fileOffset int64
fileReader *bufio.Reader
)
// Parse a block
func handleBlock(block * common.Block) {
fmt.Printf("Block: Number=[%d], CurrentBlockHash=[%s], PreviousBlockHash=[%s]\n",
block.GetHeader().Number,
base64.StdEncoding.EncodeToString(block.GetHeader().DataHash),
base64.StdEncoding.EncodeToString(block.GetHeader().PreviousHash))
if putil.IsConfigBlock(block) {
fmt.Printf(" txid=CONFIGBLOCK\n")
} else {
for _, txEnvBytes := range block.GetData().GetData() {
if txid, err := extractTxID(txEnvBytes); err != nil {
fmt.Printf("ERROR: Cannot extract txid, error=[%v]\n", err)
return
} else {
fmt.Printf(" txid=%s\n", txid)
}
}
}
// write block to file
b, err := proto.Marshal(block)
if err != nil {
fmt.Printf("ERROR: Cannot marshal block, error=[%v]\n", err)
return
}
filename := fmt.Sprintf("block%d.block", block.GetHeader().Number)
if err := ioutil.WriteFile(filename, b, 0644); err != nil {
fmt.Printf("ERROR: Cannot write block to file:[%s], error=[%v]\n", filename, err)
}
// Then you could use utility to read block content, like:
// $ configtxlator proto_decode --input block0.block --type common.Block
}
func nextBlockBytes() ([]byte, error) {
var lenBytes []byte
var err error
// At the end of file
if fileOffset == fileSize {
return nil, nil
}
remainingBytes := fileSize - fileOffset
peekBytes := 8
if remainingBytes < int64(peekBytes) {
peekBytes = int(remainingBytes)
}
if lenBytes, err = fileReader.Peek(peekBytes); err != nil {
return nil, err
}
length, n := proto.DecodeVarint(lenBytes)
if n == 0 {
return nil, fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes)
}
bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes {
return nil, ErrUnexpectedEndOfBlockfile
}
// skip the bytes representing the block size
if _, err = fileReader.Discard(n); err != nil {
return nil, err
}
blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(fileReader, blockBytes, int(length)); err != nil {
return nil, err
}
fileOffset += int64(n) + int64(length)
return blockBytes, nil
}
func deserializeBlock(serializedBlockBytes []byte) (*common.Block, error) {
block := &common.Block{}
var err error
b := lutil.NewBuffer(serializedBlockBytes)
if block.Header, err = extractHeader(b); err != nil {
return nil, err
}
if block.Data, err = extractData(b); err != nil {
return nil, err
}
if block.Metadata, err = extractMetadata(b); err != nil {
return nil, err
}
return block, nil
}
func extractHeader(buf *lutil.Buffer) (*common.BlockHeader, error) {
header := &common.BlockHeader{}
var err error
if header.Number, err = buf.DecodeVarint(); err != nil {
return nil, err
}
if header.DataHash, err = buf.DecodeRawBytes(false); err != nil {
return nil, err
}
if header.PreviousHash, err = buf.DecodeRawBytes(false); err != nil {
return nil, err
}
if len(header.PreviousHash) == 0 {
header.PreviousHash = nil
}
return header, nil
}
func extractData(buf *lutil.Buffer) (*common.BlockData, error) {
data := &common.BlockData{}
var numItems uint64
var err error
if numItems, err = buf.DecodeVarint(); err != nil {
return nil, err
}
for i := uint64(0); i < numItems; i++ {
var txEnvBytes []byte
if txEnvBytes, err = buf.DecodeRawBytes(false); err != nil {
return nil, err
}
data.Data = append(data.Data, txEnvBytes)
}
return data, nil
}
func extractMetadata(buf *lutil.Buffer) (*common.BlockMetadata, error) {
metadata := &common.BlockMetadata{}
var numItems uint64
var metadataEntry []byte
var err error
if numItems, err = buf.DecodeVarint(); err != nil {
return nil, err
}
for i := uint64(0); i < numItems; i++ {
if metadataEntry, err = buf.DecodeRawBytes(false); err != nil {
return nil, err
}
metadata.Metadata = append(metadata.Metadata, metadataEntry)
}
return metadata, nil
}
func extractTxID(txEnvelopBytes []byte) (string, error) {
txEnvelope, err := putil.GetEnvelopeFromBlock(txEnvelopBytes)
if err != nil {
return "", err
}
txPayload, err := putil.GetPayload(txEnvelope)
if err != nil {
return "", nil
}
chdr, err := putil.UnmarshalChannelHeader(txPayload.Header.ChannelHeader)
if err != nil {
return "", err
}
return chdr.TxId, nil
}
func main() {
fileName = "hyperledger/production/ledgersData/chains/chains/mychannel/blockfile_000000"
var err error
if file, err = os.OpenFile(fileName, os.O_RDONLY, 0600); err != nil {
fmt.Printf("ERROR: Cannot Open file: [%s], error=[%v]\n", fileName, err)
return
}
defer file.Close()
if fileInfo, err := file.Stat(); err != nil {
fmt.Printf("ERROR: Cannot Stat file: [%s], error=[%v]\n", fileName, err)
return
} else {
fileOffset = 0
fileSize = fileInfo.Size()
fileReader = bufio.NewReader(file)
}
// Loop each block
for {
if blockBytes, err := nextBlockBytes(); err != nil {
fmt.Printf("ERROR: Cannot read block file: [%s], error=[%v]\n", fileName, err)
break
} else if blockBytes == nil {
// End of file
break
} else {
if block, err := deserializeBlock(blockBytes); err != nil {
fmt.Printf("ERROR: Cannot deserialize block from file: [%s], error=[%v]\n", fileName, err)
break
} else {
handleBlock(block)
}
}
}
}
这个例子非常简单,读取单个ledger文件hyperledger/production/ledgersData/chains/chains/mychannel/blockfile_000000
的所有block,并把每一个block拆分成独立的文件,block{BLOCKNUMBER}.block;这个代码片段偷了点懒,没有去解析block的内容,但是我们可以通过工具configtxlator把生成的block文件转换成可读json格式。
注意,这个方式只能按顺序读取block从文件的头读到尾,不能随机读取block;因为没有ledger的index库了。