fabric源码分析之peer从orderer同步区块

2021-10-15  本文已影响0人  JC86

environment:
fabric v2.2.0

peer加入通道流程

peer&orderer deliver block流程图

peer_orderer_deliver_block.png

peer拉取区块和peer写入区块流程代码

在文件internal/peer/channel/join.go函数调用顺序为joinCmd=>join=>executeJoin,然后接着背书,调用CSCC系统链码。
CSCC链码在文件core/scc/cscc/configure.go,调用函数顺序Invoke=>InvokeNoShim=>joinChain

// joinChain will join the specified chain in the configuration block.
// Since it is the first block, it is the genesis block containing configuration
// for this chain, so we want to update the Chain object with this info
func (e *PeerConfiger) joinChain(
    channelID string,
    block *common.Block,
    deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
    lr plugindispatcher.LifecycleResources,
    nr plugindispatcher.CollectionAndLifecycleResources,
) pb.Response {
    if err := e.peer.CreateChannel(channelID, block, deployedCCInfoProvider, lr, nr); err != nil {
        return shim.Error(err.Error())
    }

    return shim.Success(nil)
}

位于core/peer/peer.go的函数CreateChannel

func (p *Peer) CreateChannel(
    cid string,
    cb *common.Block,
    deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
    legacyLifecycleValidation plugindispatcher.LifecycleResources,
    newLifecycleValidation plugindispatcher.CollectionAndLifecycleResources,
) error {
    // 初始化本地文件,数据库等
    l, err := p.LedgerMgr.CreateLedger(cid, cb)
    if err != nil {
        return errors.WithMessage(err, "cannot create ledger from genesis block")
    }
    // 开启gossip服务,leader Peer选举并连接orderer的grpc服务
    if err := p.createChannel(cid, l, deployedCCInfoProvider, legacyLifecycleValidation, newLifecycleValidation); err != nil {
        return err
    }

    p.initChannel(cid)
    return nil
}

这里主要看两个流程,对应上面流程图中peer写入区块流程:createChannel=>InitializeChannel=>NewGossipStateProvider=>deliverPayloads
和leader peer拉取区块流程(设计节点为leader peer):createChannel=>InitializeChannel=>StartDeliverForChannel=>DeliverBlocks

peer写入区块服务

直接看位于gossip/state/state.go的函数deliverPayloads

func (s *GossipStateProviderImpl) deliverPayloads() {
    for {
        select {
        // Wait for notification that next seq has arrived
        // 等待通道payloads readyChan接收信号
        case <-s.payloads.Ready():
            s.logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
            // Collect all subsequent payloads
            for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
                rawBlock := &common.Block{}
                if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
                    s.logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                    continue
                }
                if rawBlock.Data == nil || rawBlock.Header == nil {
                    s.logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
                        payload.SeqNum, rawBlock.Header, rawBlock.Data)
                    continue
                }
                s.logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))

                // Read all private data into slice
                var p util.PvtDataCollections
                if payload.PrivateData != nil {
                    err := p.Unmarshal(payload.PrivateData)
                    if err != nil {
                        s.logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                        continue
                    }
                }
                // 写入区块
                if err := s.commitBlock(rawBlock, p); err != nil {
                    if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
                        s.logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
                        return
                    }
                    s.logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
                }
            }
        case <-s.stopCh:
            s.logger.Debug("State provider has been stopped, finishing to push new blocks.")
            return
        }
    }
}

这个服务就是不断的等待payloadbuffer出现消息,如果有就从buffer拿出block写到本地账本
位于gossip/state/payloads_buffer.go就是payloadbuffer接口

peer从orderer拉取区块服务

// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (d *Deliverer) DeliverBlocks() {
    failureCounter := 0
    totalDuration := time.Duration(0)

    // InitialRetryDelay * backoffExponentBase^n > MaxRetryDelay
    // backoffExponentBase^n > MaxRetryDelay / InitialRetryDelay
    // n * log(backoffExponentBase) > log(MaxRetryDelay / InitialRetryDelay)
    // n > log(MaxRetryDelay / InitialRetryDelay) / log(backoffExponentBase)
    maxFailures := int(math.Log(float64(d.MaxRetryDelay)/float64(d.InitialRetryDelay)) / math.Log(backoffExponentBase))
    for {
        select {
        case <-d.DoneC:
            return
        default:
        }

        if failureCounter > 0 {
            var sleepDuration time.Duration
            if failureCounter-1 > maxFailures {
                sleepDuration = d.MaxRetryDelay
            } else {
                sleepDuration = time.Duration(math.Pow(1.2, float64(failureCounter-1))*100) * time.Millisecond
            }
            totalDuration += sleepDuration
            if totalDuration > d.MaxRetryDuration {
                if d.YieldLeadership {
                    d.Logger.Warningf("attempted to retry block delivery for more than %v, giving up", d.MaxRetryDuration)
                    return
                }
                d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
            }
            d.sleeper.Sleep(sleepDuration, d.DoneC)
        }
        // 获取本地账本区块高度
        ledgerHeight, err := d.Ledger.LedgerHeight()
        if err != nil {
            d.Logger.Error("Did not return ledger height, something is critically wrong", err)
            return
        }
        // 创建请求block的Envelope
        seekInfoEnv, err := d.createSeekInfo(ledgerHeight)
        if err != nil {
            d.Logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
            return
        }
        // 请求连接orderer deliver服务
        deliverClient, endpoint, cancel, err := d.connect(seekInfoEnv)
        if err != nil {
            d.Logger.Warningf("Could not connect to ordering service: %s", err)
            failureCounter++
            continue
        }

        connLogger := d.Logger.With("orderer-address", endpoint.Address)

        recv := make(chan *orderer.DeliverResponse)
        go func() {
            for {
                // 等待orderer deliver服务的区块数据
                resp, err := deliverClient.Recv()
                if err != nil {
                    connLogger.Warningf("Encountered an error reading from deliver stream: %s", err)
                    close(recv)
                    return
                }
                select {
                case recv <- resp:
                case <-d.DoneC:
                    close(recv)
                    return
                }
            }
        }()

    RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection
        for {
            select {
            case <-endpoint.Refreshed:
                connLogger.Infof("Ordering endpoints have been refreshed, disconnecting from deliver to reconnect using updated endpoints")
                break RecvLoop
            // 等待recv通道消息,block数据
            case response, ok := <-recv:
                if !ok {
                    connLogger.Warningf("Orderer hung up without sending status")
                    failureCounter++
                    break RecvLoop
                }
                // 处理消息
                err = d.processMsg(response)
                if err != nil {
                    connLogger.Warningf("Got error while attempting to receive blocks: %v", err)
                    failureCounter++
                    break RecvLoop
                }
                failureCounter = 0
            case <-d.DoneC:
                break RecvLoop
            }
        }

        // cancel and wait for our spawned go routine to exit
        cancel()
        <-recv
    }
}
func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error {
    switch t := msg.Type.(type) {
    case *orderer.DeliverResponse_Status:
        if t.Status == common.Status_SUCCESS {
            return errors.Errorf("received success for a seek that should never complete")
        }

        return errors.Errorf("received bad status %v from orderer", t.Status)
    case *orderer.DeliverResponse_Block:
        blockNum := t.Block.Header.Number
        if err := d.BlockVerifier.VerifyBlock(gossipcommon.ChannelID(d.ChannelID), blockNum, t.Block); err != nil {
            return errors.WithMessage(err, "block from orderer could not be verified")
        }

        marshaledBlock, err := proto.Marshal(t.Block)
        if err != nil {
            return errors.WithMessage(err, "block from orderer could not be re-marshaled")
        }

        // Create payload with a block received
        payload := &gossip.Payload{
            Data:   marshaledBlock,
            SeqNum: blockNum,
        }

        // Use payload to create gossip message
        gossipMsg := &gossip.GossipMessage{
            Nonce:   0,
            Tag:     gossip.GossipMessage_CHAN_AND_ORG,
            Channel: []byte(d.ChannelID),
            Content: &gossip.GossipMessage_DataMsg{
                DataMsg: &gossip.DataMessage{
                    Payload: payload,
                },
            },
        }

        d.Logger.Debugf("Adding payload to local buffer, blockNum = [%d]", blockNum)
        // Add payload to local state payloads buffer
        // 增加区块到payload buff
        if err := d.Gossip.AddPayload(d.ChannelID, payload); err != nil {
            d.Logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
            return errors.WithMessage(err, "could not add block as payload")
        }

        // Gossip messages with other nodes
        d.Logger.Debugf("Gossiping block [%d]", blockNum)
        // 同步区块到组织内的其他peer节点
        d.Gossip.Gossip(gossipMsg)
        return nil
    default:
        d.Logger.Warningf("Received unknown: %v", t)
        return errors.Errorf("unknown message type '%T'", msg.Type)
    }
}

收到从orderer deliver返回的block数据,然后处理block数据

orderer deliver流程代码

orderer deliver grpc服务的入口函数Deliver位于orderer/common/server/server.go

// Deliver sends a stream of blocks to a client after ordering
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
    logger.Debugf("Starting new Deliver handler")
    defer func() {
        if r := recover(); r != nil {
            logger.Criticalf("Deliver client triggered panic: %s\n%s", r, debug.Stack())
        }
        logger.Debugf("Closing Deliver stream")
    }()
    // policyChecker策略检查器方法,用于检查消息是否满足ChannelReaders(/Channel/Readers)通道读权限策略
    policyChecker := func(env *cb.Envelope, channelID string) error {
        chain := s.GetChain(channelID)
        if chain == nil {
            return errors.Errorf("channel %s not found", channelID)
        }
        // In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
        // This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
        sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
        return sf.Apply(env)
    }
    // 创建deliverServer类型,Receiver消息追踪器
    deliverServer := &deliver.Server{
        PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
        Receiver: &deliverMsgTracer{
            Receiver: srv,
            msgTracer: msgTracer{
                debug:    s.debug,
                function: "Deliver",
            },
        },
        ResponseSender: &responseSender{
            AtomicBroadcast_DeliverServer: srv,
        },
    }
    return s.dh.Handle(srv.Context(), deliverServer)
}

接着看Handlecommon/deliver/deliver.go

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
    addr := util.ExtractRemoteAddress(ctx)
    logger.Debugf("Starting new deliver loop for %s", addr)
    h.Metrics.StreamsOpened.Add(1)
    defer h.Metrics.StreamsClosed.Add(1)
    for {
        logger.Debugf("Attempting to read seek info message from %s", addr)
        envelope, err := srv.Recv()
        if err == io.EOF {
            logger.Debugf("Received EOF from %s, hangup", addr)
            return nil
        }
        if err != nil {
            logger.Warningf("Error reading from %s: %s", addr, err)
            return err
        }
        // 从Orderer节点本地指定通道的区块账本中获取请求的区块数据
        status, err := h.deliverBlocks(ctx, srv, envelope)
        if err != nil {
            return err
        }
        // 发送状态回应
        err = srv.SendStatusResponse(status)
        if status != cb.Status_SUCCESS {
            return err
        }
        if err != nil {
            logger.Warningf("Error sending to %s: %s", addr, err)
            return err
        }

        logger.Debugf("Waiting for new SeekInfo from %s", addr)
    }
}
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
    addr := util.ExtractRemoteAddress(ctx)
    payload, chdr, shdr, err := h.parseEnvelope(ctx, envelope) // 解析消息负载payload,检查消息负载头和通道头的合法性
    if err != nil {
        logger.Warningf("error parsing envelope from %s: %s", addr, err)
        return cb.Status_BAD_REQUEST, nil
    }
    // 从多通道注册管理器字典里获取指定通道(ChainID)的链支持对象
    chain := h.ChainManager.GetChain(chdr.ChannelId)
    if chain == nil {
        // Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
        // So we would expect our log to be somewhat flooded with these
        logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
        return cb.Status_NOT_FOUND, nil
    }

    labels := []string{
        "channel", chdr.ChannelId,
        "filtered", strconv.FormatBool(isFiltered(srv)),
        "data_type", srv.DataType(),
    }
    h.Metrics.RequestsReceived.With(labels...).Add(1)
    defer func() {
        labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
        h.Metrics.RequestsCompleted.With(labels...).Add(1)
    }()

    seekInfo := &ab.SeekInfo{}
    if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
        logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
        return cb.Status_BAD_REQUEST, nil
    }

    erroredChan := chain.Errored()
    if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT {
        // In a 'best effort' delivery of blocks, we should ignore consenter errors
        // and continue to deliver blocks according to the client's request.
        erroredChan = nil
    }
    select {
    case <-erroredChan:
        logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
        return cb.Status_SERVICE_UNAVAILABLE, nil
    default:
    }
    // 构建访问控制对象accessControl,封装一些信息
    accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, h.ExpirationCheckFunc)
    if err != nil {
        logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
        return cb.Status_BAD_REQUEST, nil
    }
    // 检查当前信息是否满足指定通道的访问权限,检查证书时间是否过期
    if err := accessControl.Evaluate(); err != nil {
        logger.Warningf("[channel: %s] Client %s is not authorized: %s", chdr.ChannelId, addr, err)
        return cb.Status_FORBIDDEN, nil
    }
    // 区块搜索信息seekInfo,检查请求范围是否合法
    if seekInfo.Start == nil || seekInfo.Stop == nil {
        logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
        return cb.Status_BAD_REQUEST, nil
    }

    logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)

    cursor, number := chain.Reader().Iterator(seekInfo.Start)
    defer cursor.Close()
    var stopNum uint64
    switch stop := seekInfo.Stop.Type.(type) {
    case *ab.SeekPosition_Oldest:
        stopNum = number
    case *ab.SeekPosition_Newest:
        // when seeking only the newest block (i.e. starting
        // and stopping at newest), don't reevaluate the ledger
        // height as this can lead to multiple blocks being
        // sent when only one is expected
        if proto.Equal(seekInfo.Start, seekInfo.Stop) {
            stopNum = number
            break
        }
        stopNum = chain.Reader().Height() - 1
    case *ab.SeekPosition_Specified:
        stopNum = stop.Specified.Number
        if stopNum < number {
            logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
            return cb.Status_BAD_REQUEST, nil
        }
    }

    for {
        if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
            if number > chain.Reader().Height()-1 {
                return cb.Status_NOT_FOUND, nil
            }
        }

        var block *cb.Block
        var status cb.Status

        iterCh := make(chan struct{})
        go func() {
            block, status = cursor.Next() // 获取需要读取区块的游标cursor,会阻塞,等待下一个block的生成
            close(iterCh)
        }()

        select {
        case <-ctx.Done(): // peer主节点断开连接
            logger.Debugf("Context canceled, aborting wait for next block")
            return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
        case <-erroredChan:
            // TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
            // this error, we will need to update this error message, possibly finding a way to signal what error text to return.
            logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
            return cb.Status_SERVICE_UNAVAILABLE, nil
        case <-iterCh: // 检查到有新block
            // Iterator has set the block and status vars
        }

        if status != cb.Status_SUCCESS {
            logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
            return status, nil
        }

        // increment block number to support FAIL_IF_NOT_READY deliver behavior
        number++
        // 再次检查是否满足访问控制策略
        if err := accessControl.Evaluate(); err != nil {
            logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
            return cb.Status_FORBIDDEN, nil
        }

        logger.Debugf("[channel: %s] Delivering block [%d] for (%p) for %s", chdr.ChannelId, block.Header.Number, seekInfo, addr)

        signedData := &protoutil.SignedData{Data: envelope.Payload, Identity: shdr.Creator, Signature: envelope.Signature}
        // 发送区块数据
        if err := srv.SendBlockResponse(block, chdr.ChannelId, chain, signedData); err != nil {
            logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
            return cb.Status_INTERNAL_SERVER_ERROR, err
        }

        h.Metrics.BlocksSent.With(labels...).Add(1)

        if stopNum == block.Header.Number {
            break
        }
    }

    logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)

    return cb.Status_SUCCESS, nil
}

值得注意的是这里,如果是拉取n+1高度区块的话(当前是区块高度n),cursor.Next()会一直等待新的区块才返回

go func() {
  block, status = cursor.Next() // 获取需要读取区块的游标cursor,会阻塞,等待下一个block的生成
  close(iterCh)
}()

文章github地址

参考:
菜鸟系列 Fabric 源码学习 - 区块同步
技术指南:Fabric中数据同步的实现
菜鸟系列Fabric源码学习 — 区块同步
Fabric v2.0 源码解析——典型的业务流程

上一篇下一篇

猜你喜欢

热点阅读