Hyperledger-Fabric源码分析(事件模拟器)
2020-04-09 本文已影响0人
小蜗牛爬楼梯
事件模拟器,顾名思义是模拟事件的。Endorser在处理的时候会用TXSimulator模拟一个读写集。读集包含了该事务在读取本地账本时的一列事务版本信息及该信息对应的的一列唯一键,写集包含了一个唯一键(可能也允许与读集中的键重复)列表和事务写入的最新值。在验证阶段,如果在事务读集中每一个key的版本都能够与world state中的key版本一致,那么该条事务则被认为是有效的。
下面我们通过一个例子来理解读写集在事件校验中的意义,这是背书的基础。
World state: (k1,1,v1), (k2,1,v2), (k3,1,v3), (k4,1,v4), (k5,1,v5)
T1 -> Write(k1, v1'), Write(k2, v2')
T2 -> Read(k1), Write(k3, v3')
T3 -> Write(k2, v2'')
T4 -> Write(k2, v2'''), read(k2)
T5 -> Write(k6, v6'), read(k5)
考虑下这样一个场景,有5组事件,发起的时候world state都一样。(k,ver,val)三元组,key,version,value,每次相同key写入,版本加一。
- T1会通过校验,因为没有读集,他只需要写入k1,k2,(k1,2,v1'), (k2,2,v2')
- T2校验失败,因为T1修改过k1了
- T3校验成功,因为没有读集,k2更新为(k2,3,v2)
- T4校验失败,因为k2早在T1的时候就已经更新了
- T5通过校验,因为k5还没有被修改过
接口
TxSimulator
type TxSimulator interface {
QueryExecutor
// SetState sets the given value for the given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
SetState(namespace string, key string, value []byte) error
// DeleteState deletes the given namespace and key
DeleteState(namespace string, key string) error
// SetMultipleKeys sets the values for multiple keys in a single call
SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
// SetStateMetadata sets the metadata associated with an existing key-tuple <namespace, key>
SetStateMetadata(namespace, key string, metadata map[string][]byte) error
// DeleteStateMetadata deletes the metadata (if any) associated with an existing key-tuple <namespace, key>
DeleteStateMetadata(namespace, key string) error
// ExecuteUpdate for supporting rich data model (see comments on QueryExecutor above)
ExecuteUpdate(query string) error
// SetPrivateData sets the given value to a key in the private data state represented by the tuple <namespace, collection, key>
SetPrivateData(namespace, collection, key string, value []byte) error
// SetPrivateDataMultipleKeys sets the values for multiple keys in the private data space in a single call
SetPrivateDataMultipleKeys(namespace, collection string, kvs map[string][]byte) error
// DeletePrivateData deletes the given tuple <namespace, collection, key> from private data
DeletePrivateData(namespace, collection, key string) error
// SetPrivateDataMetadata sets the metadata associated with an existing key-tuple <namespace, collection, key>
SetPrivateDataMetadata(namespace, collection, key string, metadata map[string][]byte) error
// DeletePrivateDataMetadata deletes the metadata associated with an existing key-tuple <namespace, collection, key>
DeletePrivateDataMetadata(namespace, collection, key string) error
// GetTxSimulationResults encapsulates the results of the transaction simulation.
// This should contain enough detail for
// - The update in the state that would be caused if the transaction is to be committed
// - The environment in which the transaction is executed so as to be able to decide the validity of the environment
// (at a later time on a different peer) during committing the transactions
// Different ledger implementation (or configurations of a single implementation) may want to represent the above two pieces
// of information in different way in order to support different data-models or optimize the information representations.
// Returned type 'TxSimulationResults' contains the simulation results for both the public data and the private data.
// The public data simulation results are expected to be used as in V1 while the private data simulation results are expected
// to be used by the gossip to disseminate this to the other endorsers (in phase-2 of sidedb)
GetTxSimulationResults() (*TxSimulationResults, error)
}
QueryExecutor
type QueryExecutor interface {
SimpleQueryExecutor
// GetStateMetadata returns the metadata for given namespace and key
GetStateMetadata(namespace, key string) (map[string][]byte, error)
// GetStateMultipleKeys gets the values for multiple keys in a single call
GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
// GetStateRangeScanIteratorWithMetadata returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan should be used judiciously for performance reasons.
// metadata is a map of additional query parameters
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
GetStateRangeScanIteratorWithMetadata(namespace string, startKey, endKey string, metadata map[string]interface{}) (QueryResultsIterator, error)
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// Only used for state databases that support query
// For a chaincode, the namespace corresponds to the chaincodeId
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)
// ExecuteQueryWithMetadata executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// metadata is a map of additional query parameters
// Only used for state databases that support query
// For a chaincode, the namespace corresponds to the chaincodeId
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (QueryResultsIterator, error)
// GetPrivateData gets the value of a private data item identified by a tuple <namespace, collection, key>
GetPrivateData(namespace, collection, key string) ([]byte, error)
// GetPrivateDataMetadata gets the metadata of a private data item identified by a tuple <namespace, collection, key>
GetPrivateDataMetadata(namespace, collection, key string) (map[string][]byte, error)
// GetPrivateDataMetadataByHash gets the metadata of a private data item identified by a tuple <namespace, collection, keyhash>
GetPrivateDataMetadataByHash(namespace, collection string, keyhash []byte) (map[string][]byte, error)
// GetPrivateDataMultipleKeys gets the values for the multiple private data items in a single call
GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([][]byte, error)
// GetPrivateDataRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (commonledger.ResultsIterator, error)
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// Only used for state databases that support query
// For a chaincode, the namespace corresponds to the chaincodeId
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
ExecuteQueryOnPrivateData(namespace, collection, query string) (commonledger.ResultsIterator, error)
// Done releases resources occupied by the QueryExecutor
Done()
}
- 可以看到TxSimulator+QueryExecutor的组合完整覆盖了chaincode所需要的跟状态机打交道的接口方法,让最终模拟执行的时候让事件模拟器进行托管。
Endorser
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
// txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit
// of valid write-sets to the stateDB), we must release the lock as early as possible.
// Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and
// rwset is collected before gossip dissemination if required for privateData. For safety, we
// add the following defer statement and is useful when an error occur. Note that calling
// txsim.Done() more than once does not cause any issue. If the txsim is already
// released, the following txsim.Done() simply returns.
defer txsim.Done()
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}
- endorser在收到ProcessProposal的处理时会判断是否需要事件模拟器介入
介入的条件
func acquireTxSimulator(chainID string, ccid *pb.ChaincodeID) bool {
if chainID == "" {
return false
}
// ¯\_(ツ)_/¯ locking.
// Don't get a simulator for the query and config system chaincode.
// These don't need the simulator and its read lock results in deadlocks.
switch ccid.Name {
case "qscc", "cscc":
return false
default:
return true
}
}
- 可以看到只有极少数的请求不需要介入,关于gscc和cscc的部分,这里不展开。
- 判断如果需要介入的话,那么下面就要开始初始化模拟器了。
初始化
func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSimulator, error) {
rwsetBuilder := rwsetutil.NewRWSetBuilder()
helper := newQueryHelper(txmgr, rwsetBuilder)
logger.Debugf("constructing new tx simulator txid = [%s]", txid)
return &lockBasedTxSimulator{lockBasedQueryExecutor{helper, txid}, rwsetBuilder, false, false, false, false}, nil
}
- 最终会调用到newLockBasedTxSimulator
- 看着其实也很简单,一是初始化读写集,二是初始化lockBasedTxSimulator,三是初始化读取的工具类queryHelper
传递
TransactionParams
txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim,
HistoryQueryExecutor: historyQueryExecutor,
}
- 生成了模拟器之后会放入TransactionParams,开始逐层传递,因为传递的路径很长,这里不一一详述。
- 我们知道模拟器是为了模拟chaincode执行时的读写操作,并将这些操作涉及到kv对保存下来。那么现在的问题是这个模拟器是怎么让shim端执行的chaincode感知的?我们接着往下看。
TransactionContexts
func (c *TransactionContexts) Create(txParams *ccprovider.TransactionParams) (*TransactionContext, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
ctxID := contextID(txParams.ChannelID, txParams.TxID)
if c.contexts[ctxID] != nil {
return nil, errors.Errorf("txid: %s(%s) exists", txParams.TxID, txParams.ChannelID)
}
txctx := &TransactionContext{
ChainID: txParams.ChannelID,
SignedProp: txParams.SignedProp,
Proposal: txParams.Proposal,
ResponseNotifier: make(chan *pb.ChaincodeMessage, 1),
TXSimulator: txParams.TXSimulator,
HistoryQueryExecutor: txParams.HistoryQueryExecutor,
CollectionStore: txParams.CollectionStore,
IsInitTransaction: txParams.IsInitTransaction,
queryIteratorMap: map[string]commonledger.ResultsIterator{},
pendingQueryResults: map[string]*PendingQueryResult{},
AllowedCollectionAccess: make(map[string]bool),
}
c.contexts[ctxID] = txctx
return txctx, nil
}
- 注意这里还没有到shim端,但是快了,即将在发往shim的前一刻。
- 这里主要是将上面传递下来的事件参数集用TransactionContext的方式再包装,然后保存到chain的contexts里面,至于为什么要这么做,我们下面会讲到。
- 另外,这里用来标识这次chaincode执行的上下文是chainid+txid,之后会根据这个id来查找这个context
-> shim
h.serialSendAsync(msg)
接下来就是给shim端发事件了
shim<-
func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the beforeInit function is exited. Interesting bug fix!!
go func() {
//better not be nil
var nextStateMsg *pb.ChaincodeMessage
defer func() {
handler.triggerNextState(nextStateMsg, errc)
}()
errFunc := func(err error, ce *pb.ChaincodeEvent, errStr string, args ...interface{}) *pb.ChaincodeMessage {
if err != nil {
payload := []byte(err.Error())
chaincodeLogger.Errorf(errStr, args...)
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid, ChaincodeEvent: ce, ChannelId: msg.ChannelId}
}
return nil
}
// Get the function and args from Payload
input := &pb.ChaincodeInput{}
unmarshalErr := proto.Unmarshal(msg.Payload, input)
if nextStateMsg = errFunc(unmarshalErr, nil, "[%s] Incorrect payload format. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil {
return
}
// Call chaincode's Run
// Create the ChaincodeStub which the chaincode can use to callback
stub := new(ChaincodeStub)
err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal)
if nextStateMsg = errFunc(err, stub.chaincodeEvent, "[%s] Transaction execution failed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil {
return
}
res := handler.cc.Invoke(stub)
// Endorser will handle error contained in Response.
resBytes, err := proto.Marshal(&res)
if nextStateMsg = errFunc(err, stub.chaincodeEvent, "[%s] Transaction execution failed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR.String()); nextStateMsg != nil {
return
}
// Send COMPLETED message to chaincode support and change state
chaincodeLogger.Debugf("[%s] Transaction completed. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED)
nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId}
}()
}
- shim端在收到peer端的ChaincodeMessage_TRANSACTION请求时,会调用这里来处理
- 可以看到这里做的几件事情
- 从请求的payload中拿到当初invoke的function和args
- 初始化ChaincodeStub
- 然后就是调用chaincode实例的Invoke了
ChaincodeStubInterface
image
- ChaincodeStubInterface这个接口是用来给chaincode跟ledger打交道的
- 下面我们随便找个简单的chaincode来看看
Chaincode
func (t *SimpleChaincode) invoke(stub shim.ChaincodeStubInterface, args []string) pb.Response {
var A, B string // Entities
var Aval, Bval int // Asset holdings
var X int // Transaction value
var err error
if len(args) != 3 {
return shim.Error("Incorrect number of arguments. Expecting 3")
}
A = args[0]
B = args[1]
// Get the state from the ledger
// TODO: will be nice to have a GetAllState call to ledger
Avalbytes, err := stub.GetState(A)
if err != nil {
return shim.Error("Failed to get state")
}
if Avalbytes == nil {
return shim.Error("Entity not found")
}
Aval, _ = strconv.Atoi(string(Avalbytes))
Bvalbytes, err := stub.GetState(B)
if err != nil {
return shim.Error("Failed to get state")
}
if Bvalbytes == nil {
return shim.Error("Entity not found")
}
Bval, _ = strconv.Atoi(string(Bvalbytes))
// Perform the execution
X, err = strconv.Atoi(args[2])
if err != nil {
return shim.Error("Invalid transaction amount, expecting a integer value")
}
Aval = Aval - X
Bval = Bval + X
fmt.Printf("Aval = %d, Bval = %d\n", Aval, Bval)
// Write the state back to the ledger
err = stub.PutState(A, []byte(strconv.Itoa(Aval)))
if err != nil {
return shim.Error(err.Error())
}
err = stub.PutState(B, []byte(strconv.Itoa(Bval)))
if err != nil {
return shim.Error(err.Error())
}
return shim.Success(nil)
}
- 可以看到这里的stub.GetState,stub.PutState 。
- 那么具体里面在做什么?接下来我们以PutState为例
PutState
func (stub *ChaincodeStub) PutState(key string, value []byte) error {
if key == "" {
return errors.New("key must not be an empty string")
}
// Access public data by setting the collection to empty string
collection := ""
return stub.handler.handlePutState(collection, key, value, stub.ChannelId, stub.TxID)
}
- 可以看到这里的PutState是通过handler来接管的,看过chaincode-invoke的应该知道,这里的handler是怎么来的?chaincode容器启动并初始化的时候,由shim.Start根据本地的peer.address来对接的。意味着,这里所有有关ledger的操作最终都会发回给peer端。
- 下面我们深入进去看看
func (handler *Handler) handlePutState(collection string, key string, value []byte, channelId string, txid string) error {
// Construct payload for PUT_STATE
payloadBytes, _ := proto.Marshal(&pb.PutState{Collection: collection, Key: key, Value: value})
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelId}
chaincodeLogger.Debugf("[%s] Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_PUT_STATE)
// Execute the request and get response
responseMsg, err := handler.callPeerWithChaincodeMsg(msg, channelId, txid)
if err != nil {
return errors.WithMessage(err, fmt.Sprintf("[%s] error sending PUT_STATE", msg.Txid))
}
if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s] Received %s. Successfully updated state", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)
return nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
chaincodeLogger.Errorf("[%s] Received %s. Payload: %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_ERROR, responseMsg.Payload)
return errors.New(string(responseMsg.Payload[:]))
}
// Incorrect chaincode message received
chaincodeLogger.Errorf("[%s] Incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, pb.ChaincodeMessage_RESPONSE, pb.ChaincodeMessage_ERROR)
return errors.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, pb.ChaincodeMessage_RESPONSE, pb.ChaincodeMessage_ERROR)
}
- 可以看到果不其然
- 组装ChaincodeMessage_PUT_STATE的ChaincodeMessage
- 然后发给peer来处理,handler.callPeerWithChaincodeMsg
Endorser
HandleTransaction
func (h *Handler) HandleTransaction(msg *pb.ChaincodeMessage, delegate handleFunc) {
chaincodeLogger.Debugf("[%s] handling %s from chaincode", shorttxid(msg.Txid), msg.Type.String())
if !h.registerTxid(msg) {
return
}
startTime := time.Now()
var txContext *TransactionContext
var err error
if msg.Type == pb.ChaincodeMessage_INVOKE_CHAINCODE {
txContext, err = h.getTxContextForInvoke(msg.ChannelId, msg.Txid, msg.Payload, "")
} else {
txContext, err = h.isValidTxSim(msg.ChannelId, msg.Txid, "no ledger context")
}
chaincodeName := h.chaincodeID.Name + ":" + h.chaincodeID.Version
meterLabels := []string{
"type", msg.Type.String(),
"channel", msg.ChannelId,
"chaincode", chaincodeName,
}
h.Metrics.ShimRequestsReceived.With(meterLabels...).Add(1)
var resp *pb.ChaincodeMessage
if err == nil {
resp, err = delegate(msg, txContext)
}
if err != nil {
err = errors.Wrapf(err, "%s failed: transaction ID: %s", msg.Type, msg.Txid)
chaincodeLogger.Errorf("[%s] Failed to handle %s. error: %+v", shorttxid(msg.Txid), msg.Type, err)
resp = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: []byte(err.Error()), Txid: msg.Txid, ChannelId: msg.ChannelId}
}
chaincodeLogger.Debugf("[%s] Completed %s. Sending %s", shorttxid(msg.Txid), msg.Type, resp.Type)
h.ActiveTransactions.Remove(msg.ChannelId, msg.Txid)
h.serialSendAsync(resp)
meterLabels = append(meterLabels, "success", strconv.FormatBool(resp.Type != pb.ChaincodeMessage_ERROR))
h.Metrics.ShimRequestDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds())
h.Metrics.ShimRequestsCompleted.With(meterLabels...).Add(1)
}
- 可以看到之前保存下来的TransactionContext这里终于又现身了。
- 首先先把txcontext拿出来
* 跟当初我们放进来的时候如出一辙
func (h *Handler) isValidTxSim(channelID string, txid string, fmtStr string, args ...interface{}) (*TransactionContext, error) { txContext := h.TXContexts.Get(channelID, txid) if txContext == nil || txContext.TXSimulator == nil { err := errors.Errorf(fmtStr, args...) chaincodeLogger.Errorf("no ledger context: %s %s\n\n %+v", channelID, txid, err) return nil, err } return txContext, nil }
- 接下来就是拿着context去实际执行delegate的PutState了,你基本也猜得到会做什么了。
HandlePutState
func (h *Handler) HandlePutState(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
putState := &pb.PutState{}
err := proto.Unmarshal(msg.Payload, putState)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}
chaincodeName := h.ChaincodeName()
collection := putState.Collection
if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
err = txContext.TXSimulator.SetPrivateData(chaincodeName, collection, putState.Key, putState.Value)
} else {
err = txContext.TXSimulator.SetState(chaincodeName, putState.Key, putState.Value)
}
if err != nil {
return nil, errors.WithStack(err)
}
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}
- txContext.TXSimulator.SetState(chaincodeName, putState.Key, putState.Value),很显眼不是么?
- 至此,模拟器在整个流程中是怎么流转的,基本上就这么多。
- 下面我们讲讲模拟器的内部实现。
TXSimulator
前面主要是讲模拟器的来龙去脉,让大家有个认识说,到底chaincode是怎么模拟执行的。接下来再讲讲peer是怎样通过模拟器来收集读写集的。先从简单的开始。
Read
func (h *queryHelper) getState(ns string, key string) ([]byte, []byte, error) {
if err := h.checkDone(); err != nil {
return nil, nil, err
}
versionedValue, err := h.txmgr.db.GetState(ns, key)
if err != nil {
return nil, nil, err
}
val, metadata, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToReadSet(ns, key, ver)
}
return val, metadata, nil
}
- 前面讲过了,TXSimulator读取的部分由queryHelper代劳
- 当然了,模拟归模拟,你还是得玩真的,从底层账本中拿到[k,v,v]
- h.rwsetBuilder.AddToReadSet这里就是写入读写集了。
- 一定有人注意到,这里写入读集的并不是val,而是ver。
* 我们最终是比较的key的版本,换句话说是这个key是哪个block的哪个事件写入的。这是非常严谨的,如果val的话,ABA的问题,保证不了。
type Height struct { BlockNum uint64 TxNum uint64 }
Write
func (s *lockBasedTxSimulator) SetState(ns string, key string, value []byte) error {
if err := s.checkWritePrecondition(key, value); err != nil {
return err
}
s.rwsetBuilder.AddToWriteSet(ns, key, value)
return nil
}
- 至于写入的接口,那全部都由TxSimulator接口代劳
- checkWritePrecondition,这里主要是校验key和value是否是有效的输入。
- 这里没什么可说的,直接key+value灌入写集。
InvokeChaincode
上面的读写部分,基本没有什么特殊的逻辑,接下来我们看一个复杂一点的。chaincode里面转调其他chaincode,看下这是怎么实现的?
func (h *Handler) HandleInvokeChaincode(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
chaincodeLogger.Debugf("[%s] C-call-C", shorttxid(msg.Txid))
chaincodeSpec := &pb.ChaincodeSpec{}
err := proto.Unmarshal(msg.Payload, chaincodeSpec)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}
// Get the chaincodeID to invoke. The chaincodeID to be called may
// contain composite info like "chaincode-name:version/channel-name".
// We are not using version now but default to the latest.
targetInstance := ParseName(chaincodeSpec.ChaincodeId.Name)
chaincodeSpec.ChaincodeId.Name = targetInstance.ChaincodeName
if targetInstance.ChainID == "" {
// use caller's channel as the called chaincode is in the same channel
targetInstance.ChainID = txContext.ChainID
}
chaincodeLogger.Debugf("[%s] C-call-C %s on channel %s", shorttxid(msg.Txid), targetInstance.ChaincodeName, targetInstance.ChainID)
err = h.checkACL(txContext.SignedProp, txContext.Proposal, targetInstance)
if err != nil {
chaincodeLogger.Errorf(
"[%s] C-call-C %s on channel %s failed check ACL [%v]: [%s]",
shorttxid(msg.Txid),
targetInstance.ChaincodeName,
targetInstance.ChainID,
txContext.SignedProp,
err,
)
return nil, errors.WithStack(err)
}
// Set up a new context for the called chaincode if on a different channel
// We grab the called channel's ledger simulator to hold the new state
txParams := &ccprovider.TransactionParams{
TxID: msg.Txid,
ChannelID: targetInstance.ChainID,
SignedProp: txContext.SignedProp,
Proposal: txContext.Proposal,
TXSimulator: txContext.TXSimulator,
HistoryQueryExecutor: txContext.HistoryQueryExecutor,
}
if targetInstance.ChainID != txContext.ChainID {
lgr := h.LedgerGetter.GetLedger(targetInstance.ChainID)
if lgr == nil {
return nil, errors.Errorf("failed to find ledger for channel: %s", targetInstance.ChainID)
}
sim, err := lgr.NewTxSimulator(msg.Txid)
if err != nil {
return nil, errors.WithStack(err)
}
defer sim.Done()
hqe, err := lgr.NewHistoryQueryExecutor()
if err != nil {
return nil, errors.WithStack(err)
}
txParams.TXSimulator = sim
txParams.HistoryQueryExecutor = hqe
}
chaincodeLogger.Debugf("[%s] getting chaincode data for %s on channel %s", shorttxid(msg.Txid), targetInstance.ChaincodeName, targetInstance.ChainID)
version := h.SystemCCVersion
if !h.SystemCCProvider.IsSysCC(targetInstance.ChaincodeName) {
// if its a user chaincode, get the details
cd, err := h.DefinitionGetter.ChaincodeDefinition(targetInstance.ChaincodeName, txParams.TXSimulator)
if err != nil {
return nil, errors.WithStack(err)
}
version = cd.CCVersion()
err = h.InstantiationPolicyChecker.CheckInstantiationPolicy(targetInstance.ChaincodeName, version, cd.(*ccprovider.ChaincodeData))
if err != nil {
return nil, errors.WithStack(err)
}
}
// Launch the new chaincode if not already running
chaincodeLogger.Debugf("[%s] launching chaincode %s on channel %s", shorttxid(msg.Txid), targetInstance.ChaincodeName, targetInstance.ChainID)
cccid := &ccprovider.CCContext{
Name: targetInstance.ChaincodeName,
Version: version,
}
// Execute the chaincode... this CANNOT be an init at least for now
responseMessage, err := h.Invoker.Invoke(txParams, cccid, chaincodeSpec.Input)
if err != nil {
return nil, errors.Wrap(err, "execute failed")
}
// payload is marshalled and sent to the calling chaincode's shim which unmarshals and
// sends it to chaincode
res, err := proto.Marshal(responseMessage)
if err != nil {
return nil, errors.Wrap(err, "marshal failed")
}
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}
- 这里代码看起来似曾相识,又是在组装TransactionParams。但这里不同的是,转调的chaincode和主chaincode共用一个TXSimulator。当然了,你如果要跨链调用,那就另说了。
- 之后跟前面一样,拿到TransactionParams,然后在peer端保存新的chaincode的txcontext,又走一遍Invoke的流程。
- 最后两个chaincode的执行情况会汇总到一个模拟器里面进行输出。
Iterator
- 接下来,我们看下复杂查询的设计。在Fabric里面,都是通过封装迭代器的方式,每次Next的时候,将读取的记录进行收集。先看下各自的struct。
type queryResultsItr struct {
DBItr statedb.ResultsIterator
RWSetBuilder *rwsetutil.RWSetBuilder
}
type resultsItr struct {
ns string
endKey string
dbItr statedb.ResultsIterator
rwSetBuilder *rwsetutil.RWSetBuilder
rangeQueryInfo *kvrwset.RangeQueryInfo
rangeQueryResultsHelper *rwsetutil.RangeQueryResultsHelper
}
- queryResultsItr是针对CouchDB的富查询来的。
- resultsItr支持Leveldb和CouchDB的,主要针对的是Range查询的。
- 都类似,都是通过底层statedb.ResultsIterator来进行迭代
- 都包含rwsetutil.RWSetBuilder
queryResultsItr
func (h *queryHelper) executeQuery(namespace, query string) (commonledger.ResultsIterator, error) {
if err := h.checkDone(); err != nil {
return nil, err
}
dbItr, err := h.txmgr.db.ExecuteQuery(namespace, query)
if err != nil {
return nil, err
}
return &queryResultsItr{DBItr: dbItr, RWSetBuilder: h.rwsetBuilder}, nil
}
- queryResultsItr是Fabric专门针对CouchDB的富查询来设计的。
- 这里注意的是,还记得前面queryHelper初始化的时候会用到读写集么?也就是说queryHelper与queryResultsItr共享同一个读写集。
Next
func (itr *queryResultsItr) Next() (commonledger.QueryResult, error) {
queryResult, err := itr.DBItr.Next()
if err != nil {
return nil, err
}
if queryResult == nil {
return nil, nil
}
versionedQueryRecord := queryResult.(*statedb.VersionedKV)
logger.Debugf("queryResultsItr.Next() returned a record:%s", string(versionedQueryRecord.Value))
if itr.RWSetBuilder != nil {
itr.RWSetBuilder.AddToReadSet(versionedQueryRecord.Namespace, versionedQueryRecord.Key, versionedQueryRecord.Version)
}
return &queryresult.KV{Namespace: versionedQueryRecord.Namespace, Key: versionedQueryRecord.Key, Value: versionedQueryRecord.Value}, nil
}
- itr.RWSetBuilder.AddToReadSet,很显眼不是么?
resultsItr
func newResultsItr(ns string, startKey string, endKey string, metadata map[string]interface{},
db statedb.VersionedDB, rwsetBuilder *rwsetutil.RWSetBuilder, enableHashing bool, maxDegree uint32) (*resultsItr, error) {
var err error
var dbItr statedb.ResultsIterator
if metadata == nil {
dbItr, err = db.GetStateRangeScanIterator(ns, startKey, endKey)
} else {
dbItr, err = db.GetStateRangeScanIteratorWithMetadata(ns, startKey, endKey, metadata)
}
if err != nil {
return nil, err
}
itr := &resultsItr{ns: ns, dbItr: dbItr}
// it's a simulation request so, enable capture of range query info
if rwsetBuilder != nil {
itr.rwSetBuilder = rwsetBuilder
itr.endKey = endKey
// just set the StartKey... set the EndKey later below in the Next() method.
itr.rangeQueryInfo = &kvrwset.RangeQueryInfo{StartKey: startKey}
resultsHelper, err := rwsetutil.NewRangeQueryResultsHelper(enableHashing, maxDegree)
if err != nil {
return nil, err
}
itr.rangeQueryResultsHelper = resultsHelper
}
return itr, nil
}
- 当然了,首先是拿到RangeQuery的Iterator
- 有意思的是,这里跟前面的queryResultsItr实现天差万别。首先rwsetBuilder并没有去收集读取的记录,而只是作为是否要去初始化rangeQueryResultsHelper的一个标志而已。
- 而收集读取记录的任务交棒给rangeQueryResultsHelper
func NewRangeQueryResultsHelper(enableHashing bool, maxDegree uint32) (*RangeQueryResultsHelper, error) { helper := &RangeQueryResultsHelper{pendingResults: nil, hashingEnabled: enableHashing, maxDegree: maxDegree, mt: nil} if enableHashing { var err error if helper.mt, err = newMerkleTree(maxDegree); err != nil { return nil, err } } return helper, nil }
- 而rangeQueryResultsHelper在最终校验的时候不是去读写集里面去校验,而是更加优雅,更具性价比的方式,就是MerkleTree的hash比较。
- 我们下面先跳出来看下MerkleTree的实现
MerkleTree
type RangeQueryResultsHelper struct {
pendingResults []*kvrwset.KVRead
mt *merkleTree
maxDegree uint32
hashingEnabled bool
}
type merkleTree struct {
tree map[MerkleTreeLevel][]Hash
maxLevel MerkleTreeLevel
maxDegree uint32
}
- 这里有几个概念要搞清楚,Level好比多叉树的层数,Degree是指子节点的个数,当然了tree就是hash树
- 下面我们看下里面的关键的算法
update
func (m *merkleTree) update(nextLeafLevelHash Hash) error {
logger.Debugf("Before update() = %s", m)
defer logger.Debugf("After update() = %s", m)
m.tree[leafLevel] = append(m.tree[leafLevel], nextLeafLevelHash)
currentLevel := leafLevel
for {
currentLevelHashes := m.tree[currentLevel]
if uint32(len(currentLevelHashes)) <= m.maxDegree {
return nil
}
nextLevelHash, err := computeCombinedHash(currentLevelHashes)
if err != nil {
return err
}
delete(m.tree, currentLevel)
nextLevel := currentLevel + 1
m.tree[nextLevel] = append(m.tree[nextLevel], nextLevelHash)
if nextLevel > m.maxLevel {
m.maxLevel = nextLevel
}
currentLevel = nextLevel
}
}
- MerkleTree有新的hash加入的话
- 其实MerkleTree的算法都差不多,无非就是从子节点加入,当子节点个数到达maxdegree,然后合并hash。
- 合并hash的结果会move到上一层,以此类推。一直到maxlevel。
Done
func (m *merkleTree) done() error {
logger.Debugf("Before done() = %s", m)
defer logger.Debugf("After done() = %s", m)
currentLevel := leafLevel
var h Hash
var err error
for currentLevel < m.maxLevel {
currentLevelHashes := m.tree[currentLevel]
switch len(currentLevelHashes) {
case 0:
currentLevel++
continue
case 1:
h = currentLevelHashes[0]
default:
if h, err = computeCombinedHash(currentLevelHashes); err != nil {
return err
}
}
delete(m.tree, currentLevel)
currentLevel++
m.tree[currentLevel] = append(m.tree[currentLevel], h)
}
finalHashes := m.tree[m.maxLevel]
if uint32(len(finalHashes)) > m.maxDegree {
delete(m.tree, m.maxLevel)
m.maxLevel++
combinedHash, err := computeCombinedHash(finalHashes)
if err != nil {
return err
}
m.tree[m.maxLevel] = []Hash{combinedHash}
}
return nil
}
- 基本上就是汇总啦,前面update只是针对新加入的hash怎么加入hash树做文章,而这里是收拢hash到maxlevel。当然如果最终maxlevel的个数超过degree,就合并hash,否则有多少就返回多少。
- 这里最终的目的是让每次update的tree,都会产生不一样的结果,这样才能体现MerkleTree的优势。结果集中有任何的误差都会导致最终生成的hash都会不一致。要知道最终校验的时候是看结果集是不是一致,至于里面具体有哪些,并不关心。而这个算法还有个好处是,前面计算的大部分hash结果会保留下来,不需要再重新计算。
getSummery
func (m *merkleTree) getSummery() *kvrwset.QueryReadsMerkleSummary {
return &kvrwset.QueryReadsMerkleSummary{MaxDegree: m.maxDegree,
MaxLevel: uint32(m.getMaxLevel()),
MaxLevelHashes: hashesToBytes(m.getMaxLevelHashes())}
}
- 奇怪的是,这个MerkleTree没有roothash的概念。没有就没有把,最终校验也并没有强求roothash,不是不能比啦,就是别捏点。
举例
maxdegree=3, resultset=1-9
L[1]:hash[1] L[1]:hash[1],hash[2] L[1]:hash[1],hash[2],hash[3] L[2]:hash[1-4] <-update L[2]:hash[1-4] L[1]:hash[5] L[2]:hash[1-4] L[1]:hash[5],hash[6] L[2]:hash[1-4] L[1]:hash[5],hash[6],hash[7] L[2]:hash[1-4],hash[5-8] <- update L[2]:hash[1-4],hash[5-8] L[1]:hash[9] L[2]:hash[1-4],hash[5-8],hash[9] <-done
Next
func (itr *resultsItr) Next() (commonledger.QueryResult, error) {
queryResult, err := itr.dbItr.Next()
if err != nil {
return nil, err
}
itr.updateRangeQueryInfo(queryResult)
if queryResult == nil {
return nil, nil
}
versionedKV := queryResult.(*statedb.VersionedKV)
return &queryresult.KV{Namespace: versionedKV.Namespace, Key: versionedKV.Key, Value: versionedKV.Value}, nil
}
- 中间被MerkleTree干扰,回到正题。这里是resultItr的Next迭代。
- 这里 没有使用读写集,而是将Next的结果去updateRangeQueryInfo
updateRangeQueryInfo
func (itr *resultsItr) updateRangeQueryInfo(queryResult statedb.QueryResult) {
if itr.rwSetBuilder == nil {
return
}
if queryResult == nil {
// caller scanned till the iterator got exhausted.
// So, set the endKey to the actual endKey supplied in the query
itr.rangeQueryInfo.ItrExhausted = true
itr.rangeQueryInfo.EndKey = itr.endKey
return
}
versionedKV := queryResult.(*statedb.VersionedKV)
itr.rangeQueryResultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version))
// Set the end key to the latest key retrieved by the caller.
// Because, the caller may actually not invoke the Next() function again
itr.rangeQueryInfo.EndKey = versionedKV.Key
}
func (helper *RangeQueryResultsHelper) AddResult(kvRead *kvrwset.KVRead) error {
logger.Debug("Adding a result")
helper.pendingResults = append(helper.pendingResults, kvRead)
if helper.hashingEnabled && uint32(len(helper.pendingResults)) > helper.maxDegree {
logger.Debug("Processing the accumulated results")
if err := helper.processPendingResults(); err != nil {
return err
}
}
return nil
}
最终Next出来的记录会append到pendingResults里面,等待加入到MerkleTree
processPendingResults
func (helper *RangeQueryResultsHelper) processPendingResults() error {
var b []byte
var err error
if b, err = serializeKVReads(helper.pendingResults); err != nil {
return err
}
helper.pendingResults = nil
hash, err := bccspfactory.GetDefault().Hash(b, hashOpts)
if err != nil {
return err
}
helper.mt.update(hash)
return nil
}
- 这里很简单,就是将pending的result转成hash,然后加到MerkleTree里面
校验
- 我们前面讲完了,模拟器是怎么介入的,内部是怎么记录读写集的。还剩最后一个板块,也就是读写集最终是怎样被拿来校验的呢?
- 我们回顾下事件的生命周期,会经过endorser模拟执行,然后client收集背书,发给orderer去排序,orderer生成block,然后发给peer,peer最后commit到本地账本。这里所说的校验就是指,peer即将写入本地前对block中事件(tx)做的校验。
- 想象一下,怎么保证事件的准确有效,请回顾下本篇开头的例子。
- 我们下面重点看下上面描述的两种形式的校验,读写集和RangeQuery。
validateKVRead
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *privacyenabledstate.PubUpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
committedVersion, err := v.db.GetVersion(ns, kvRead.Key)
if err != nil {
return false, err
}
logger.Debugf("Comparing versions for key [%s]: committed version=%#v and read version=%#v",
kvRead.Key, committedVersion, rwsetutil.NewVersion(kvRead.Version))
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%#v], Version in readSet [%#v]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
- 很明显,这里拿出每一个读取的记录去比较他们的当前的版本和执行时的版本,来判定是否一致。
- 这里顺便提下updates,这里你理解成writeset就好。
validateRangeQuery
func (v *rangeQueryHashValidator) validate() (bool, error) {
itr := v.itr
lastMatchedIndex := -1
inMerkle := v.rqInfo.GetReadsMerkleHashes()
var merkle *kvrwset.QueryReadsMerkleSummary
logger.Debugf("inMerkle: %#v", inMerkle)
for {
var result statedb.QueryResult
var err error
if result, err = itr.Next(); err != nil {
return false, err
}
logger.Debugf("Processing result = %#v", result)
if result == nil {
if _, merkle, err = v.resultsHelper.Done(); err != nil {
return false, err
}
equals := inMerkle.Equal(merkle)
logger.Debugf("Combined iterator exhausted. merkle=%#v, equals=%t", merkle, equals)
return equals, nil
}
versionedKV := result.(*statedb.VersionedKV)
v.resultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version))
merkle := v.resultsHelper.GetMerkleSummary()
if merkle.MaxLevel < inMerkle.MaxLevel {
logger.Debugf("Hashes still under construction. Noting to compare yet. Need more results. Continuing...")
continue
}
if lastMatchedIndex == len(merkle.MaxLevelHashes)-1 {
logger.Debugf("Need more results to build next entry [index=%d] at level [%d]. Continuing...",
lastMatchedIndex+1, merkle.MaxLevel)
continue
}
if len(merkle.MaxLevelHashes) > len(inMerkle.MaxLevelHashes) {
logger.Debugf("Entries exceeded from what are present in the incoming merkleSummary. Validation failed")
return false, nil
}
lastMatchedIndex++
if !bytes.Equal(merkle.MaxLevelHashes[lastMatchedIndex], inMerkle.MaxLevelHashes[lastMatchedIndex]) {
logger.Debugf("Hashes does not match at index [%d]. Validation failed", lastMatchedIndex)
return false, nil
}
}
}
- 可以看到这里重新用iterator迭代的方式生成MerkleTree
- 从几个维度来判定是否一致
- 两棵树的高度要一致
- 顶层的hash节点数要一致
- 顶层的每一个hash都要一致
最后
关于模拟器的大部分的东西,我想都差不多讲到了。当然了,中间有大量的细节,篇幅有限,我没办法一一展开,如果能让你有个基本的认识,我也知足了。
作者:Pillar_Zhong
链接:https://www.jianshu.com/p/d54aa587716b
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。