以太坊原理解析

[以太坊源码分析][p2p网络07]:同步区块和交易

2019-02-04  本文已影响0人  jea的笔记本
eth07.jpg

同步,也就是区块链的数据的同步。这里分为两种同步方式,一是本地区块链与远程节点的区块链进行同步,二是将交易均匀的同步给相邻的节点。

0.索引

01.同步区块链
02.同步交易
03.总结

1.同步区块链

ProtocolManager协议管理中的go pm.syncer()协程。

func (pm *ProtocolManager) syncer() {
    // 启动fetcher,辅助同步区块。
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // 等待不同的事件来触发同步操作。
    forceSync := time.NewTicker(forceSyncCycle) // 强制同步触发的计时器。
    defer forceSync.Stop()

    for {
        select {
        // 有新建的远程节点。
        case <-pm.newPeerCh:
            // 确保有节点可以选择,然后同步。数量最低为5个。
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // 没有足够数量的节点,也进行强制同步。
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

先启动了fetcher,辅助同步区块用的。然后等待不同的事件触发不同的同步方式。

同步的过程调用pm.synchronise方法来进行。

func (pm *ProtocolManager) synchronise(peer *peer) {
    ...
    // 确保远程节点比本地区块链要新。通过难度值td来做比较。
    currentBlock := pm.blockchain.CurrentBlock()
    td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

    pHead, pTd := peer.Head()
    ...
    // 设置同步的方式。
    ...
    // 运行同步循环,与下载器同步。如果本地已有枢纽区块数据则取消快速同步的方式。
    if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
        return
    }
    ...
    // 完成了同步之后,将本地区块链的高度最高的区块广播出去,给其他节点广播的是区块哈希。
    if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
        go pm.BroadcastBlock(head, false)
    }
}

2.同步交易

ProtocolManager协议管理中的go pm.txsyncLoop()协程。

同步交易循环 txsyncLoop 分为三个部分的内容:

1.定义初始化的字段。
var (
        pending = make(map[enode.ID]*txsync)
        sending = false              
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )
2.定义发送交易和挑选函数。

发送交易的函数。

send := func(s *txsync) {
    // 根据设置的长度,填充pack交易包。
    size := common.StorageSize(0)
    pack.p = s.p
    pack.txs = pack.txs[:0]
    for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
        pack.txs = append(pack.txs, s.txs[i])
        size += s.txs[i].Size()
    }
    // 移除将要发送的交易。
    s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
    if len(s.txs) == 0 {
        delete(pending, s.p.ID())
    }
    // 后台发送交易包。
    s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
    sending = true
    go func() { done <- pack.p.SendTransactions(pack.txs) }()
}

挑选函数。

pick := func() *txsync {
    if len(pending) == 0 {
        return nil
    }
    n := rand.Intn(len(pending)) + 1
    for _, s := range pending {
        if n--; n == 0 {
            return s
        }
    }
    return nil
}
3.执行发送交易的循环。
for {
    select {
    case s := <-pm.txsyncCh:
        pending[s.p.ID()] = s
        if !sending {
            send(s)
        }
    case err := <-done:
        sending = false
        if err != nil {
            pack.p.Log().Debug("Transaction send failed", "err", err)
            delete(pending, pack.p.ID())
        }
        // 计划下次发送。
        if s := pick(); s != nil {
            send(s)
        }
    case <-pm.quitSync:
        return
    }
}

三个监听协程的case

3.总结

上一篇下一篇

猜你喜欢

热点阅读