HyperLedgerFabric源码解读(5)-channel

2018-11-23  本文已影响0人  神奇的考拉

关于channel的补充

// 在hyperledger fabric中 通道channel其实就是在至少两个成员(members)或组织(orgnization)间专门为私人或机密进行的交易而建立的私有“子网”.
// 一个通道主要包含:成员-member(组织-orgnization)、每个成员的锚节点(anchor peer)、共享账本(sharing ledger)、应用链码(application chaincode)、排序服务节点(orderer peer)
// 网络中的每笔交易(transaction)都在指定的通道channel中执行,每个通信方必须经过身份验证并授权在该通道channel上进行交易。而加入channel的每个peer都具有成员服务提供商(members service provider MSP)提供的身份
// 1、创建channel:通过客户端SDK调用configuration system chaincode以及应用属性(锚点、成员[组织]等)。发起的请求为channel ledger创建一个创世区块(genesis block),存储有关channel的策略、成员、锚点等配置信息
// 当将新成员添加到现有的channel时,Genesis block或最近被配置的区块block分享给新成员
// 2、leader election: channel中每个成员的leadering peer的选举决定了哪个peer代表成员或组织与orderering service进行通信。(若是没有指定leader 则使用算法来指定leader)
// 共识算法将交易排序并以一个block的形式发送给一个leader,然后再由leader分发给其他peer,并用gossip协议进行跨链channel通信
// 在实际情况中任意一个锚节点可以属于多个通道,并维护了多个账本,但不会有任何账本数据从一个通道channel传到另一个通道channel
// 主要是由于账本的分离是基于通道来的,而分离有事在配置链码chaincode、成员标识不玩和gossip协议来定义和实现的
// (1)、数据的传播,包括交易的信息,账本状态和通道成员等都在通道内受限制的验证成员身份的节点之间,是根据通道对节点和账本数据进行隔离,允许网络成员可以在同一个区块链网络中请求私有的和保密的交易给业务上的竞争对手和其他受限的成员。

2、源码开始

// 通道channel的配置项
// Config is a configuration item
// of the channel store
type Config struct {
    ID                          string                          // channel id
    PublishStateInfoInterval    time.Duration                   // 发送状态info的间隔
    MaxBlockCountToStore        int                             // 区块缓存区大小
    PullPeerNum                 int                             // 消息获取的节点
    PullInterval                time.Duration                   // 消息获取间隔(秒)
    RequestStateInfoInterval    time.Duration                   // 状态信息获取间隔(秒)
    BlockExpirationInterval     time.Duration                   // 块过期时间间隔 ???
    StateInfoCacheSweepInterval time.Duration                   // 缓存状态信息清理间隔
}
// 处理所有通道channel相关的消息message的对象
// GossipChannel defines an object that deals with all channel-related messages
type GossipChannel interface {

    // 返回对等节点的状态信息的消息
    // Self returns a StateInfoMessage about the peer
    Self() *proto.SignedGossipMessage

    // 返回一个对等节点列表peers并带有metadata而这些metadata也是由各自peer所发布的
    // GetPeers returns a list of peers with metadata as published by them
    GetPeers() []discovery.NetworkMember

    // 对等节点过滤器通过接受一个子通道选择策略(SubChannelSelectionCriteria)并返回一个路由过滤器,并使用过滤器来选择匹配给定标准的对等节点
    // PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
    // only peer identities that match the given criteria
    PeerFilter(api.SubChannelSelectionCriteria) filter.RoutingFilter

    // 检查给定的成员是否隶属于当前的通道channel
    // IsMemberInChan checks whether the given member is eligible to be in the channel
    IsMemberInChan(member discovery.NetworkMember) bool

    // 更新对等节点peer的账本高度向其他对等节点peers发送
    // UpdateLedgerHeight updates the ledger height the peer
    // publishes to other peers in the channel
    UpdateLedgerHeight(height uint64)

    // 更新链码chaincode向其他对等节点peers发送
    // UpdateChaincodes updates the chaincodes the peer publishes
    // to other peers in the channel
    UpdateChaincodes(chaincode []*proto.Chaincode)

    // 是否给定的组织隶属于当前通道channel
    // IsOrgInChannel returns whether the given organization is in the channel
    IsOrgInChannel(membersOrg api.OrgIdentityType) bool

    // 是否给定的成员能够获取当前通道channel的区块block
    // EligibleForChannel returns whether the given member should get blocks
    // for this channel
    EligibleForChannel(member discovery.NetworkMember) bool

    // 处理远程节点peer发送的消息
    // HandleMessage processes a message sent by a remote peer
    HandleMessage(proto.ReceivedMessage)

    // 将给定的GossipMessage存放到本地message缓存
    // AddToMsgStore adds a given GossipMessage to the message store
    AddToMsgStore(msg *proto.SignedGossipMessage)

    // 配置能够有资格进入该通道channel的组织列表
    // JoinChannelMessage: 维护成员列表的新建和修改的消息以及在对等节点中传播的消息
    // ConfigureChannel (re)configures the list of organizations
    // that are eligible to be in the channel
    ConfigureChannel(joinMsg api.JoinChannelMessage)

    // 确保peer与channel的关联解除
    // LeaveChannel makes the peer leave the channel
    LeaveChannel()

    // 停止通道channel
    // Stop stops the channel's activity
    Stop()
}
// 完成gossipChannel连接gossipServiceImpl
// Adapter enables the gossipChannel
// to communicate with gossipServiceImpl.
type Adapter interface {
    // Gossip消息签名
    Sign(msg *proto.GossipMessage) (*proto.SignedGossipMessage, error)

    // 返回GossipChannel的配置信息
    // GetConf returns the configuration that this GossipChannel will posses
    GetConf() Config

    // 通过通道channel萨博一条消息
    // Gossip gossips a message in the channel
    Gossip(message *proto.SignedGossipMessage)

    // 发送消息给下一个点 hop
    // Forward sends a message to the next hops
    Forward(message proto.ReceivedMessage)

    // 分发一个item给所有的通道channel订阅者
    // DeMultiplex de-multiplexes an item to subscribers
    DeMultiplex(interface{})

    // 返回已知活着的对等节点peer及其消息
    // GetMembership returns the known alive peers and their information
    GetMembership() []discovery.NetworkMember

    // 根据公钥ID类型返回对应的网络成员 未发现时则返回nil
    // Lookup returns a network member, or nil if not found
    Lookup(PKIID common.PKIidType) *discovery.NetworkMember

    // 向对等节点集发送消息(签名后的gossip message)
    // Send sends a message to a list of peers
    Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer)

    // 验证状态信息消息:当消息签名不正确的话 则返回error 否则返回nil
    // ValidateStateInfoMessage returns an error if a message
    // hasn't been signed correctly, nil otherwise.
    ValidateStateInfoMessage(message *proto.SignedGossipMessage) error

    // 返回给定对等点(peer)PKI-ID的组织(organization)ID
    // GetOrgOfPeer returns the organization ID of a given peer PKI-ID
    GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType

    // 用一个确定的公钥id返回一个对等节点peer的身份,若是为找到 则返回nil
    // GetIdentityByPKIID returns an identity of a peer with a certain
    // pkiID, or nil if not found
    GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType
}
// gossip Channel
type gossipChannel struct {
    Adapter                                             // 适配器
    sync.RWMutex
    shouldGossipStateInfo     int32                     //
    mcs                       api.MessageCryptoService  // 信息加密服务
    pkiID                     common.PKIidType          // 公钥ID
    selfOrg                   api.OrgIdentityType       // 组织身份类型
    stopChan                  chan struct{}            // channel停止标识
    stateInfoMsg              *proto.SignedGossipMessage // 签名的gossip消息:状态信息消息
    orgs                      []api.OrgIdentityType      // 组织
    joinMsg                   api.JoinChannelMessage     // 进入通道channel的消息(JoinChannelMessage包括两类消息:成员列表的新建和修改的消息;在对等节点间传播的消息)
    blockMsgStore             msgstore.MessageStore      // 区块消息存储
    stateInfoMsgStore         *stateInfoCache            // 状态消息存储
    leaderMsgStore            msgstore.MessageStore      //  leader消息存储
    chainID                   common.ChainID             // 通道channel的id
    blocksPuller              pull.Mediator              // 区块获取的中间媒介
    logger                    util.Logger
    stateInfoPublishScheduler *time.Ticker                // 状态信息发送调度器
    stateInfoRequestScheduler *time.Ticker                // 状态消息接收调度器
    memFilter                 *membershipFilter           // 成员过滤器
    ledgerHeight              uint64                      // 账本高度
    incTime                   uint64
    leftChannel               int32                       // 离开channel
}
// 成员过滤器
type membershipFilter struct {
    adapter Adapter                // 适配器
    *gossipChannel                 // 通道
}
// 返回已知活着的对等节点peers及各自的信息
// GetMembership returns the known alive peers and their information
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
    if mf.hasLeftChannel() {   // 已不归属channel的成员 不需要做处理
        return nil
    }
    var members []discovery.NetworkMember
    for _, mem := range mf.adapter.GetMembership() {   // 遍历通过adapter获取到的所有membership
        if mf.eligibleForChannelAndSameOrg(mem) {       // 是否隶属当前channel并且属于同一组织organization:符合要求的membership追加本地缓存members里面
            members = append(members, mem)
        }
    }
    return members
}
// 新建GossipChannel
// NewGossipChannel creates a new GossipChannel
func NewGossipChannel(pkiID common.PKIidType,    // 公钥ID类型
    org api.OrgIdentityType,                     // 组织身份类型
    mcs api.MessageCryptoService,                // 消息加密服务
    chainID common.ChainID,                      // 通道id
    adapter Adapter,                             // 适配器
    joinMsg api.JoinChannelMessage) GossipChannel {  // 维护一个成员列表的创建、修改的消息和在所有对等节点peers传播的消息
    gc := &gossipChannel{                            // 创建gossip channel
        incTime:                   uint64(time.Now().UnixNano()),
        selfOrg:                   org,
        pkiID:                     pkiID,
        mcs:                       mcs,
        Adapter:                   adapter,
        logger:                    util.GetLogger(util.LoggingChannelModule, adapter.GetConf().ID),
        stopChan:                  make(chan struct{}, 1),
        shouldGossipStateInfo:     int32(0),
        stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval),
        stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval),
        orgs:                      []api.OrgIdentityType{},
        chainID:                   chainID,
    }
    // 成员过滤器
    gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc}
    // 消息过滤策略:根据持有的最大区块数创建消息过滤策略
    comparator := proto.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore)
    // 区块获取者
    gc.blocksPuller = gc.createBlockPuller()
    // 消息序列号
    seqNumFromMsg := func(m interface{}) string {
        return fmt.Sprintf("%d", m.(*proto.SignedGossipMessage).GetDataMsg().Payload.SeqNum)
    }
    // 新建区块消息存储并设置过期淘汰机制
    // 该消息存储器设定了消息替换策略以及失效触发器:旧消息在msgTTL之后过期,不过在过期期间首先要持有外部锁,继续过期回调调用,接着释放对应的外部锁;
    // 注:赋值时 外部锁Lock和回调可以设置为nil
    gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator,  // 消息替换策略
        func(m interface{}) {                                       // 失效触发器
            gc.blocksPuller.Remove(seqNumFromMsg(m))
    }, gc.GetConf().BlockExpirationInterval,                          // msgTTL
    nil,                                                // 加锁
    nil,                                              // 解锁
    func(m interface{}) {                                           // 回调(针对过期消息的操作)
        gc.blocksPuller.Remove(seqNumFromMsg(m))
    })

    // 过期成员peer
    // 根据签名的gossip消息里面的状态信息提取公钥ID 再根据公钥ID 进行查找
    hashPeerExpiredInMembership := func(o interface{}) bool {
        pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId  //
        return gc.Lookup(pkiID) == nil
    }
    // 验证状态信息
    verifyStateInfoMsg := func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool {
        // 状态信息
        si := msg.GetStateInfo()
        // No point in verifying ourselves
        if bytes.Equal(gc.pkiID, si.PkiId) {  //比对两个地方的公钥id是否一致
            return true
        }
        peerIdentity := adapter.GetIdentityByPKIID(si.PkiId)  // 根据公钥id获取对应的对等节点身份
        if len(peerIdentity) == 0 {
            gc.logger.Warning("Identity for peer", si.PkiId, "doesn't exist")
            return false
        }

        // 组织organization是否隶属通道channel
        isOrgInChan := func(org api.OrgIdentityType) bool {
            if len(orgs) == 0 {     // 只有一个组织
                if !gc.IsOrgInChannel(org) {  // 不属于通道channel内
                    return false
                }
            } else {                // 多个组织
                found := false
                for _, chanMember := range orgs {  // 遍历组织集 查找对应的organization
                    if bytes.Equal(chanMember, org) {
                        found = true
                        break
                    }
                }
                if !found {
                    return false
                }
            }
            return true
        }

        // 对等节点peer的组织
        org := gc.GetOrgOfPeer(si.PkiId)
        if !isOrgInChan(org) {  // 对等节点peer的organization是否隶属channel
            gc.logger.Warning("peer", peerIdentity, "'s organization(", string(org), ") isn't in the channel", string(chainID))
            return false
        }
        if err := gc.mcs.VerifyByChannel(chainID, peerIdentity, msg.Signature, msg.Payload); err != nil { //验证消息是否属于channel
            gc.logger.Warningf("Peer %v isn't eligible for channel %s : %+v", peerIdentity, string(chainID), errors.WithStack(err))
            return false
        }
        return true
    }
    // 新建状态信息缓存:根据指定缓存清理间隔,清理已失效的成员节点peer,再对状态信息进行验证;经过这些操作后得到新的状态信息缓存
    gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg)
    // 有效信息超时时间
    ttl := election.GetMsgExpirationTimeout()
    // GossipMessage比较器
    pol := proto.NewGossipMessageComparator(0)
    // leader节点信息存储
    gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol,   // 消息替换策略
        msgstore.Noop,                                           // 失效消息触发器
        ttl,                                                     // 消息失效期
        nil,                                         // 加锁
        nil,                                     // 解锁
        nil)                                         // 回调(信息失效触发的回调操作)

    gc.ConfigureChannel(joinMsg)                                 // 变更成员列表和对等节点peers间传播信息

    // 定期发布状态信息
    // Periodically publish state info
    go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)

    // 定期请求状态信息
    // Periodically request state info
    go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
    return gc
}
// 停止channel
// Stop stop the channel operations
func (gc *gossipChannel) Stop() {
    gc.stopChan <- struct{}{}       // 停止信号
    gc.blocksPuller.Stop()           // 停止区块获取
    gc.stateInfoPublishScheduler.Stop() // 停止状态信息发布调度器
    gc.stateInfoRequestScheduler.Stop() // 停止状态信息请求调度器
    gc.leaderMsgStore.Stop()            // leader消息存储清除
    gc.stateInfoMsgStore.Stop()         // 状态信息消息存储清除
    gc.blockMsgStore.Stop()             // 区块消息存储清除
}
// 定时调用
func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
    for {
        select {
        case <-c:
            fn()
        case <-gc.stopChan:
            gc.stopChan <- struct{}{}
            return
        }
    }
}
// 返回对等节点peer的状态信息消息
// Self returns a StateInfoMessage about the peer
func (gc *gossipChannel) Self() *proto.SignedGossipMessage {
    gc.RLock()
    defer gc.RUnlock()
    return gc.stateInfoMsg
}
// 对等节点不再隶属channel
// LeaveChannel makes the peer leave the channel
func (gc *gossipChannel) LeaveChannel() {
    gc.Lock()
    defer gc.Unlock()
    //
    atomic.StoreInt32(&gc.leftChannel, 1)
    // 链码
    var chaincodes []*proto.Chaincode
    // 账本高度
    var height uint64
    if prevMsg := gc.stateInfoMsg; prevMsg != nil {
        chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
        height = prevMsg.GetStateInfo().Properties.LedgerHeight
    }
    gc.updateProperties(height, chaincodes, true)
}
func (gc *gossipChannel) hasLeftChannel() bool {
    return atomic.LoadInt32(&gc.leftChannel) == 1
}
// 对等节点列表
// GetPeers returns a list of peers with metadata as published by them
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
    var members []discovery.NetworkMember
    if gc.hasLeftChannel() {
        return members
    }

    for _, member := range gc.GetMembership() {   // 成员内容检查
        if !gc.EligibleForChannel(member) {          // 通道
            continue
        }
        stateInf := gc.stateInfoMsgStore.MsgByID(member.PKIid) // 消息存储
        if stateInf == nil {
            continue
        }
        props := stateInf.GetStateInfo().Properties    // 属性
        if props != nil && props.LeftChannel {
            continue
        }
        member.Properties = stateInf.GetStateInfo().Properties   // 成员属性
        member.Envelope = stateInf.Envelope         // 成员内容
        members = append(members, member)           // 追加符合要求的peer
    }
    return members
}
// 请求状态信息
func (gc *gossipChannel) requestStateInfo() {
    req, err := gc.createStateInfoRequest() // 状态请求
    if err != nil {
        gc.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
        return
    }
    endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)  // 筛选对等节点
    gc.Send(req, endpoints...)  // 向筛选后的对等节点peer发送状态请求
}
// 隶属于同一个通道channel且是相同组织的网络节点
func (gc *gossipChannel) eligibleForChannelAndSameOrg(member discovery.NetworkMember) bool {
    sameOrg := func(networkMember discovery.NetworkMember) bool {                    // 相同组织
        return bytes.Equal(gc.GetOrgOfPeer(networkMember.PKIid), gc.selfOrg)
    }
    return filter.CombineRoutingFilters(gc.EligibleForChannel, sameOrg)(member)     // 路由过滤 获取对应的member
}
// 发送状态信息
func (gc *gossipChannel) publishStateInfo() {
    if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) { // gossip状态信息是否可用
        return
    }
    gc.RLock()
    stateInfoMsg := gc.stateInfoMsg
    gc.RUnlock()
    gc.Gossip(stateInfoMsg)                           // 散播状态信息
    if len(gc.GetMembership()) > 0 {
        atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0)) // 其他gossip节点已收到对应的状态信息
    }
}
// 区块获取中间代理
func (gc *gossipChannel) createBlockPuller() pull.Mediator {
    conf := pull.Config{
        MsgType:           proto.PullMsgType_BLOCK_MSG,
        Channel:           []byte(gc.chainID),
        ID:                gc.GetConf().ID,
        PeerCountToSelect: gc.GetConf().PullPeerNum,
        PullInterval:      gc.GetConf().PullInterval,
        Tag:               proto.GossipMessage_CHAN_AND_ORG,
    }
    seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {  // 消息序列号
        dataMsg := msg.GetDataMsg()
        if dataMsg == nil || dataMsg.Payload == nil {
            gc.logger.Warning("Non-data block or with no payload")
            return ""
        }
        return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
    }
    adapter := &pull.PullAdapter{                              // 获取适配
        Sndr:        gc,
        MemSvc:      gc.memFilter,
        IdExtractor: seqNumFromMsg,
        MsgCons: func(msg *proto.SignedGossipMessage) {
            gc.DeMultiplex(msg)
        },
    }

    adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest { // 数字摘要过滤策略
        gc.RLock()
        height := gc.ledgerHeight
        gc.RUnlock()
        digests := digestMsg.Digests
        digestMsg.Digests = nil
        for i := range digests {
            seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
            if err != nil {
                gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
                continue
            }
            if seqNum >= height {
                digestMsg.Digests = append(digestMsg.Digests, digests[i])
            }

        }
        return digestMsg
    }

    return pull.NewPullMediator(conf, adapter)
}
// 检查指定的成员是否在通道channel里
// IsMemberInChan checks whether the given member is eligible to be in the channel
func (gc *gossipChannel) IsMemberInChan(member discovery.NetworkMember) bool {
    org := gc.GetOrgOfPeer(member.PKIid)  // 获取组织
    if org == nil {
        return false
    }

    return gc.IsOrgInChannel(org)  // 组织是否属于channel
}
// 根据给定的标准 过滤符合的对等节点peer
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria
func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCriteria) filter.RoutingFilter {
    return func(member discovery.NetworkMember) bool {
        peerIdentity := gc.GetIdentityByPKIID(member.PKIid)  // 根据成员的公钥id获取对应的peer的身份
        if len(peerIdentity) == 0 {
            return false
        }
        msg := gc.stateInfoMsgStore.MembershipStore.MsgByID(member.PKIid)  // 状态信息
        if msg == nil {
            return false
        }

        return messagePredicate(api.PeerSignature{  //
            Message:      msg.Payload,
            Signature:    msg.Signature,
            PeerIdentity: peerIdentity,
        })
    }
}
// 组织是否在channel中
// IsOrgInChannel returns whether the given organization is in the channel
func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
    gc.RLock()
    defer gc.RUnlock()
    for _, orgOfChan := range gc.orgs {  //遍历比对成员的组织与gossip channel的orgs比对
        if bytes.Equal(orgOfChan, membersOrg) {
            return true
        }
    }
    return false
}
// 网络成员能否获取通道channel区块
// EligibleForChannel returns whether the given member should get blocks
// for this channel
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
    peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
    if len(peerIdentity) == 0 {
        gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
        return false
    }
    msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
    if msg == nil {
        return false
    }
    return true
}
// 添加gossipMEssage到gossip channel的区块消息存储以及区块获取存放
// AddToMsgStore adds a given GossipMessage to the message store
func (gc *gossipChannel) AddToMsgStore(msg *proto.SignedGossipMessage) {
    if msg.IsDataMsg() {
        gc.blockMsgStore.Add(msg)
        gc.blocksPuller.Add(msg)
    }

    if msg.IsStateInfoMsg() {
        gc.stateInfoMsgStore.Add(msg)
    }
}
// 配置适合通道channel的组织列表
// ConfigureChannel (re)configures the list of organizations
// that are eligible to be in the channel
func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
    gc.Lock()
    defer gc.Unlock()

    // 参数验证
    if len(joinMsg.Members()) == 0 {
        gc.logger.Warning("Received join channel message with empty set of members")
        return
    }

    if gc.joinMsg == nil {
        gc.joinMsg = joinMsg
    }

    if gc.joinMsg.SequenceNumber() > (joinMsg.SequenceNumber()) {
        gc.logger.Warning("Already have a more updated JoinChannel message(", gc.joinMsg.SequenceNumber(), ") than", joinMsg.SequenceNumber())
        return
    }
    // 成员
    gc.orgs = joinMsg.Members()
    gc.joinMsg = joinMsg
    gc.stateInfoMsgStore.validate(joinMsg.Members())
}
// 处理消息
// HandleMessage processes a message sent by a remote peer
func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
    if !gc.verifyMsg(msg) {
        gc.logger.Warning("Failed verifying message:", msg.GetGossipMessage().GossipMessage)
        return
    }
    m := msg.GetGossipMessage() // 是否是通道消息 不是则丢弃
    if !m.IsChannelRestricted() {
        gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it")
        return
    }
    orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)  // peer的组织身份
    if len(orgID) == 0 {
        gc.logger.Debug("Couldn't find org identity of peer", msg.GetConnectionInfo())
        return
    }
    if !gc.IsOrgInChannel(orgID) { // 是否属于channel消息
        gc.logger.Warning("Point to point message came from", msg.GetConnectionInfo(),
            ", org(", string(orgID), ") but it's not eligible for the channel", string(gc.chainID))
        return
    }

    if m.IsStateInfoPullRequestMsg() {  // 状态信息获取请求消息
        msg.Respond(gc.createStateInfoSnapshot(orgID))
        return
    }

    if m.IsStateInfoSnapshot() {  // 状态信息快照
        gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
        return
    }

    if m.IsDataMsg() || m.IsStateInfoMsg() {
        added := false

        if m.IsDataMsg() {   //数据消息
            if m.GetDataMsg().Payload == nil {
                gc.logger.Warning("Payload is empty, got it from", msg.GetConnectionInfo().ID)
                return
            }
            // Would this block go into the message store if it was verified?
            if !gc.blockMsgStore.CheckValid(msg.GetGossipMessage()) { //验证消息
                return
            }
            if !gc.verifyBlock(m.GossipMessage, msg.GetConnectionInfo().ID) { // 验证区块
                gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
                return
            }
            added = gc.blockMsgStore.Add(msg.GetGossipMessage())
        } else { // StateInfoMsg verification should be handled in a layer above  状态信息检验
            //  since we don't have access to the id mapper here
            added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
        }

        if added {
            // Forward the message 转发信息
            gc.Forward(msg)
            // DeMultiplex to local subscribers  分发给本地订阅者
            gc.DeMultiplex(m)

            if m.IsDataMsg() {
                gc.blocksPuller.Add(msg.GetGossipMessage())
            }
        }
        return
    }

    if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
        if gc.hasLeftChannel() {
            gc.logger.Info("Received Pull message from", msg.GetConnectionInfo().Endpoint, "but left the channel", string(gc.chainID))
            return
        }
        // If we don't have a StateInfo message from the peer,
        // no way of validating its eligibility in the channel.
        // 对等节点是否有状态信息
        if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {
            gc.logger.Debug("Don't have StateInfo message of peer", msg.GetConnectionInfo())
            return
        }
        if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
            gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
            return
        }
        if m.IsDataUpdate() { // 遍历envelope过滤已存在blockMsgStore中的区块数据 或者已过去很久的block
            // Iterate over the envelopes, and filter out blocks
            // that we already have in the blockMsgStore, or blocks that
            // are too far in the past.
            filteredEnvelopes := []*proto.Envelope{}
            for _, item := range m.GetDataUpdate().Data { // 更新数据
                gMsg, err := item.ToGossipMessage()  // 转为GossipMessage
                // 检查gossip message
                if err != nil {
                    gc.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
                    return
                }
                if !bytes.Equal(gMsg.Channel, []byte(gc.chainID)) {
                    gc.logger.Warning("DataUpdate message contains item with channel", gMsg.Channel, "but should be", gc.chainID)
                    return
                }
                // Would this block go into the message store if it was verified?
                if !gc.blockMsgStore.CheckValid(msg.GetGossipMessage()) {
                    return
                }
                if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {  // 验证块
                    return
                }
                added := gc.blockMsgStore.Add(gMsg)  // 本地区块信息记录存储
                if !added { // 内存已存在的区块信息数据或者区块数据本身已过去较长  则认为不可添加到本地内存区块记录中
                    // If this block doesn't need to be added, it means it either already
                    // exists in memory or that it is too far in the past
                    continue
                }
                filteredEnvelopes = append(filteredEnvelopes, item)
            }
            // 用已处理的区块信息更新SignedGossipMessage
            // Replace the update message with just the blocks that should be processed
            m.GetDataUpdate().Data = filteredEnvelopes
        }
        gc.blocksPuller.HandleMessage(msg)   // 处理接收消息
    }

    if m.IsLeadershipMsg() {               // leader消息
        // Handling leadership message
        added := gc.leaderMsgStore.Add(m)
        if added {
            gc.DeMultiplex(m)
        }
    }
}
// 状态信息快照处理
func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
    chanName := string(gc.chainID)
    for _, envelope := range m.GetStateSnapshot().Elements { // 遍历状态信息聚合集
        stateInf, err := envelope.ToGossipMessage()
        if err != nil {
            gc.logger.Warningf("Channel %s : StateInfo snapshot contains an invalid message: %+v", chanName, errors.WithStack(err))
            return
        }
        if !stateInf.IsStateInfoMsg() {   // 非状态信息消息
            gc.logger.Warning("Channel", chanName, ": Element of StateInfoSnapshot isn't a StateInfoMessage:",
                stateInf, "message sent from", sender)
            return
        }
        si := stateInf.GetStateInfo()   // 状态信息消息
        orgID := gc.GetOrgOfPeer(si.PkiId)  // 组织ID
        if orgID == nil {
            gc.logger.Debug("Channel", chanName, ": Couldn't find org identity of peer",
                string(si.PkiId), "message sent from", string(sender))
            return
        }

        if !gc.IsOrgInChannel(orgID) {
            gc.logger.Warning("Channel", chanName, ": Peer", stateInf.GetStateInfo().PkiId,
                "is not in an eligible org, can't process a stateInfo from it, sent from", sender)
            return
        }
        // 生产通道channel的MAC: SHA256(基于公钥 + 链id)
        expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
        if !bytes.Equal(si.Channel_MAC, expectedMAC) {   // 比对是否符合
            gc.logger.Warning("Channel", chanName, ": StateInfo message", stateInf,
                ", has an invalid MAC. Expected", expectedMAC, ", got", si.Channel_MAC, ", sent from", sender)
            return
        }
        err = gc.ValidateStateInfoMessage(stateInf)  // 验证状态信息
        if err != nil {
            gc.logger.Warningf("Channel %s: Failed validating state info message: %v sent from %v : %+v", chanName, stateInf, sender, errors.WithStack(err))
            return
        }

        if gc.Lookup(si.PkiId) == nil {  // gossip channel对应的peer公钥ID 是否有效(过期的状态信息对等节点peer会被忽略)
            // Skip StateInfo messages that belong to peers
            // that have been expired
            continue
        }

        gc.stateInfoMsgStore.Add(stateInf)
    }
}
// 验证区块
func (gc *gossipChannel) verifyBlock(msg *proto.GossipMessage, sender common.PKIidType) bool {
    if !msg.IsDataMsg() {   // 数据更新消息 生产block的消息
        gc.logger.Warning("Received from ", sender, "a DataUpdate message that contains a non-block GossipMessage:", msg)
        return false
    }
    payload := msg.GetDataMsg().Payload  // 负载内容是否有效
    if payload == nil {
        gc.logger.Warning("Received empty payload from", sender)
        return false
    }
    seqNum := payload.SeqNum    // 序列号
    rawBlock := payload.Data    // 数据
    err := gc.mcs.VerifyBlock(msg.Channel, seqNum, rawBlock)  // 消息加密服务:通道、序列号、区块数据
    if err != nil {
        gc.logger.Warningf("Received fabricated block from %v in DataUpdate: %+v", sender, errors.WithStack(err))
        return false
    }
    return true
}
// 创建状态信息快照
func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
    sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)    // 是否属于相同组织
    rawElements := gc.stateInfoMsgStore.Get()            // 状态信息消息(签名的gossipMessage)
    elements := []*proto.Envelope{}
    for _, rawEl := range rawElements {                 // 遍历状态信息
        msg := rawEl.(*proto.SignedGossipMessage)
        orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)   //
        // If we're in the same org as the requester, or the message belongs to a foreign org
        // don't do any filtering
        if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) { // 属于相同组织的request 或 消息属于外部组织 均不用过滤
            elements = append(elements, msg.Envelope)
            continue
        }
        // Else, the requester is in a different org, so disclose only StateInfo messages that their
        // corresponding AliveMessages have external endpoints
        if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {  // 当request来自不同的组织 只暴露对应的AliveMessage中的外部终端信息
            continue
        }
        elements = append(elements, msg.Envelope)
    }

    return &proto.GossipMessage{
        Channel: gc.chainID,                       //
        Tag:     proto.GossipMessage_CHAN_OR_ORG,  //
        Nonce:   0,
        Content: &proto.GossipMessage_StateSnapshot{
            StateSnapshot: &proto.StateInfoSnapshot{
                Elements: elements,
            },
        },
    }
}
func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
    if msg == nil {
        gc.logger.Warning("Messsage is nil")
        return false
    }
    m := msg.GetGossipMessage()
    if m == nil {
        gc.logger.Warning("Message content is empty")
        return false
    }

    if msg.GetConnectionInfo().ID == nil {
        gc.logger.Warning("Message has nil PKI-ID")
        return false
    }

    if m.IsStateInfoMsg() {
        si := m.GetStateInfo()
        expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
        if !bytes.Equal(expectedMAC, si.Channel_MAC) {
            gc.logger.Warning("Message contains wrong channel MAC(", si.Channel_MAC, "), expected", expectedMAC)
            return false
        }
        return true
    }

    if m.IsStateInfoPullRequestMsg() {
        sipr := m.GetStateInfoPullReq()
        expectedMAC := GenerateMAC(msg.GetConnectionInfo().ID, gc.chainID)
        if !bytes.Equal(expectedMAC, sipr.Channel_MAC) {
            gc.logger.Warning("Message contains wrong channel MAC(", sipr.Channel_MAC, "), expected", expectedMAC)
            return false
        }
        return true
    }

    if !bytes.Equal(m.Channel, []byte(gc.chainID)) {
        gc.logger.Warning("Message contains wrong channel(", m.Channel, "), expected", gc.chainID)
        return false
    }
    return true
}
func (gc *gossipChannel) createStateInfoRequest() (*proto.SignedGossipMessage, error) {
    return (&proto.GossipMessage{
        Tag:   proto.GossipMessage_CHAN_OR_ORG,
        Nonce: 0,
        Content: &proto.GossipMessage_StateInfoPullReq{
            StateInfoPullReq: &proto.StateInfoPullRequest{
                Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
            },
        },
    }).NoopSign()
}
// UpdateLedgerHeight updates the ledger height the peer
// publishes to other peers in the channel
func (gc *gossipChannel) UpdateLedgerHeight(height uint64) {
    gc.Lock()
    defer gc.Unlock()

    var chaincodes []*proto.Chaincode
    var leftChannel bool
    if prevMsg := gc.stateInfoMsg; prevMsg != nil {
        leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
        chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
    }
    gc.updateProperties(height, chaincodes, leftChannel)
}
// UpdateChaincodes updates the chaincodes the peer publishes
// to other peers in the channel
func (gc *gossipChannel) UpdateChaincodes(chaincodes []*proto.Chaincode) {
    gc.Lock()
    defer gc.Unlock()

    var ledgerHeight uint64 = 1
    var leftChannel bool
    if prevMsg := gc.stateInfoMsg; prevMsg != nil {
        ledgerHeight = prevMsg.GetStateInfo().Properties.LedgerHeight
        leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
    }
    gc.updateProperties(ledgerHeight, chaincodes, leftChannel)
}
// UpdateStateInfo updates this channel's StateInfo message
// that is periodically published
func (gc *gossipChannel) updateStateInfo(msg *proto.SignedGossipMessage) {
    gc.stateInfoMsgStore.Add(msg)
    gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
    gc.stateInfoMsg = msg
    atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
func (gc *gossipChannel) updateProperties(ledgerHeight uint64, chaincodes []*proto.Chaincode, leftChannel bool) {
    stateInfMsg := &proto.StateInfo{
        Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
        PkiId:       gc.pkiID,
        Timestamp: &proto.PeerTime{
            IncNum: gc.incTime,
            SeqNum: uint64(time.Now().UnixNano()),
        },
        Properties: &proto.Properties{
            LeftChannel:  leftChannel,
            LedgerHeight: ledgerHeight,
            Chaincodes:   chaincodes,
        },
    }
    m := &proto.GossipMessage{
        Nonce: 0,
        Tag:   proto.GossipMessage_CHAN_OR_ORG,
        Content: &proto.GossipMessage_StateInfo{
            StateInfo: stateInfMsg,
        },
    }

    msg, err := gc.Sign(m)
    if err != nil {
        gc.logger.Error("Failed signing message:", err)
        return
    }
    gc.updateStateInfo(msg)
}
func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool, verifyFunc membershipPredicate) *stateInfoCache {
    membershipStore := util.NewMembershipStore()
    pol := proto.NewGossipMessageComparator(0)

    s := &stateInfoCache{
        verify:          verifyFunc,
        MembershipStore: membershipStore,
        stopChan:        make(chan struct{}),
    }
    invalidationTrigger := func(m interface{}) {
        pkiID := m.(*proto.SignedGossipMessage).GetStateInfo().PkiId
        membershipStore.Remove(pkiID)
    }
    s.MessageStore = msgstore.NewMessageStore(pol, invalidationTrigger)

    go func() {
        for {
            select {
            case <-s.stopChan:
                return
            case <-time.After(sweepInterval):
                s.Purge(hasExpired)
            }
        }
    }()
    return s
}
// membershipPredicate receives a StateInfoMessage and optionally a slice of organization identifiers
// and returns whether the peer that signed the given StateInfoMessage is eligible
// to the channel or not
type membershipPredicate func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool

// stateInfoCache is actually a messageStore
// that also indexes messages that are added
// so that they could be extracted later
type stateInfoCache struct {
    verify membershipPredicate
    *util.MembershipStore
    msgstore.MessageStore
    stopChan chan struct{}
}
func (cache *stateInfoCache) validate(orgs []api.OrgIdentityType) {
    for _, m := range cache.Get() {
        msg := m.(*proto.SignedGossipMessage)
        if !cache.verify(msg, orgs...) {
            cache.delete(msg)
        }
    }
}
// Add attempts to add the given message to the stateInfoCache,
// and if the message was added, also indexes it.
// Message must be a StateInfo message.
func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
    if !cache.MessageStore.CheckValid(msg) {
        return false
    }
    if !cache.verify(msg) {
        return false
    }
    added := cache.MessageStore.Add(msg)
    if added {
        pkiID := msg.GetStateInfo().PkiId
        cache.MembershipStore.Put(pkiID, msg)
    }
    return added
}
func (cache *stateInfoCache) delete(msg *proto.SignedGossipMessage) {
    cache.Purge(func(o interface{}) bool {
        pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
        return bytes.Equal(pkiID, msg.GetStateInfo().PkiId)
    })
    cache.Remove(msg.GetStateInfo().PkiId)
}

func (cache *stateInfoCache) Stop() {
    cache.stopChan <- struct{}{}
}

// GenerateMAC returns a byte slice that is derived from the peer's PKI-ID
// and a channel name
func GenerateMAC(pkiID common.PKIidType, channelID common.ChainID) []byte {
    // Hash is computed on (PKI-ID || channel ID)
    preImage := append([]byte(pkiID), []byte(channelID)...)
    return common_utils.ComputeSHA256(preImage)
}
上一篇 下一篇

猜你喜欢

热点阅读