超级账本HyperLeder

Hyperledger-Fabric源码分析(orderer-d

2020-04-07  本文已影响0人  小蜗牛爬楼梯

前面我们讲过Orderer的broadcast服务,是为了收集事件消息,之后会根据这些消息来生成block。那问题来了,block怎么散播出去呢?deliver就是干这个的。

接受事件

在Orderer启动的时候就会初始化deliver的grpc server来接受客户端的请求. 大部分情况是peer来拉取block列表,其他的场景待分析。

func (h *Handler) Handle(ctx context.Context, srv *Server) error {
    addr := util.ExtractRemoteAddress(ctx)
    logger.Debugf("Starting new deliver loop for %s", addr)
    h.Metrics.StreamsOpened.Add(1)
    defer h.Metrics.StreamsClosed.Add(1)
    for {
    
        envelope, err := srv.Recv()

        status, err := h.deliverBlocks(ctx, srv, envelope)

        err = srv.SendStatusResponse(status)
    }
}

一个典型的grpc server的请求处理流程。最终deliver请求会流转到deliverBlocks来处理。

消息处理

deliverBlocks

erroredChan := chain.Errored()
    select {
    case <-erroredChan:
        return cb.Status_SERVICE_UNAVAILABLE, nil
    default:
    }

前面一大推标准的envelope校验流程,这里就不展开了.

这里会监听erroredChan的通知,那收到通知代表什么呢?有兴趣的可以进去看看,都是要命的地方会触发,说明Orderer后端的共识服务有问题。进一步该Orderer的账本就很有可能没有及时同步的,所以也就没有资格给别人提供服务了。所以这里立即返回。

    cursor, number := chain.Reader().Iterator(seekInfo.Start)
    defer cursor.Close()
    var stopNum uint64
    switch stop := seekInfo.Stop.Type.(type) {
    case *ab.SeekPosition_Oldest:
        stopNum = number
    case *ab.SeekPosition_Newest:
        stopNum = chain.Reader().Height() - 1
    case *ab.SeekPosition_Specified:
        stopNum = stop.Specified.Number
        if stopNum < number {
            return cb.Status_BAD_REQUEST, nil
        }
    }    
  1. SeekInfo是deliver请求的关键参数,这里决定了我是要哪个范围的block列表。
  2. 这里主要的目的就是根据position的类型来分别设置start和stop下标。
    接下来正式开始循环遍历cursor
for {
        if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
            if number > chain.Reader().Height()-1 {
                return cb.Status_NOT_FOUND, nil
            }
        }

        var block *cb.Block
        var status cb.Status

        iterCh := make(chan struct{})
        go func() {
            block, status = cursor.Next()
            close(iterCh)
        }()
        
        select {
        case <-ctx.Done():
            logger.Debugf("Context canceled, aborting wait for next block")
            return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
        case <-erroredChan:
            logger.Warningf("Aborting deliver for request because of background error")
            return cb.Status_SERVICE_UNAVAILABLE, nil
        case <-iterCh:
            // Iterator has set the block and status vars
        }

        number++

        if err := srv.SendBlockResponse(block); err != nil {
            logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
            return cb.Status_INTERNAL_SERVER_ERROR, err
        }

        if stopNum == block.Header.Number {
            break
        }
    }

1.如果当前请求的block,本地还没有ready,如果是SeekInfo_FAIL_IF_NOT_READY模式,立即返回。
2.换句话说如果是SeekInfo_BLOCK_UNTIL_READY,难道会一直等到ready么?别急下面一起来看。
3.首先cursor.next会一直等待,直到有新的block已经ready。当拿到后,会通知iterCh,不然会一直等在case <-iterCh这里。
4.到这里后,表示已经收到了新的block,当然start要+1啦。
5.循环给请求方SendBlockResponse
6.如果到达stop,循环收工。
7.另外要注意的是,这里是从Orderer本地查询block列表。

func (rs *responseSender) SendBlockResponse(block *cb.Block) error {
    response := &ab.DeliverResponse{
        Type: &ab.DeliverResponse_Block{Block: block},
    }
    return rs.Send(response)
}

将block组装成DeliverResponse,返回给Client

最后

这里主要是Orderer部分的deliver是怎么服务的,至于各个Client请求完后是怎么使用的,这个之后单独来讲。

上一篇 下一篇

猜你喜欢

热点阅读