以太坊(ethereum)实现研究

ethereum p2p Kademlia的实现之三

2018-04-18  本文已影响16人  古则

ethereum p2p Kademlia的实现之一
ethereum p2p Kademlia的实现之二

1.初始化,seednode的添加

//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
=>
tab.loadSeedNodes(false)

func (tab *Table) loadSeedNodes(bond bool) {
    seeds := tab.db.querySeeds(seedCount, seedMaxAge)
    seeds = append(seeds, tab.nursery...)
    //bond为false
    if bond {
        seeds = tab.bondall(seeds)
    }
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
        log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
        tab.add(seed)
    }
}

先从数据库中查得符合条件的节点,将bootsnode(nursery)一起添加到k桶中

// 首先获得需要将node放入k桶的哪一行,如果改行还有剩余空间,放入
// 如果没有剩余空间,从这一行的replacements中选出中选出活跃时间最早(最小)的一个节点,替换掉
func (tab *Table) add(new *Node) {
    tab.mutex.Lock()
    defer tab.mutex.Unlock()

    b := tab.bucket(new.sha)
    if !tab.bumpOrAdd(b, new) {
        // Node is not in table. Add it to the replacement list.
        tab.addReplacement(b, new)
    }
}

// 该方法用于确定将node放入k桶的哪一行
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(sha common.Hash) *bucket {
    d := logdist(tab.self.sha, sha)
    if d <= bucketMinDistance {
        return tab.buckets[0]
    }
    return tab.buckets[d-bucketMinDistance-1]
}

可见每一行中replacements的作用

2.K桶的维护(检查,刷新等操作)

调用过程

//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
=>
func (tab *Table) loop()

下面是对loop()方法的分析

// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
    var (
        revalidate     = time.NewTimer(tab.nextRevalidateTime())
        refresh        = time.NewTicker(refreshInterval)
        copyNodes      = time.NewTicker(copyNodesInterval)
        revalidateDone = make(chan struct{})
        refreshDone    = make(chan struct{})           // where doRefresh reports completion
        waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
    )
    defer refresh.Stop()
    defer revalidate.Stop()
    defer copyNodes.Stop()

    // Start initial refresh.
    go tab.doRefresh(refreshDone)

loop:
    for {
        select {
        case <-refresh.C:
            tab.seedRand()
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case req := <-tab.refreshReq:
            waiting = append(waiting, req)
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case <-refreshDone:
            for _, ch := range waiting {
                close(ch)
            }
            waiting, refreshDone = nil, nil
        case <-revalidate.C:
            go tab.doRevalidate(revalidateDone)
        case <-revalidateDone:
            revalidate.Reset(tab.nextRevalidateTime())
        case <-copyNodes.C:
            go tab.copyBondedNodes()
        case <-tab.closeReq:
            break loop
        }
    }

    if tab.net != nil {
        tab.net.close()
    }
    if refreshDone != nil {
        <-refreshDone
    }
    for _, ch := range waiting {
        close(ch)
    }
    tab.db.close()
    close(tab.closed)
}

这个函数主要包含三个定时器

分别定时执行doRefresh,doRevalidate,copyBondedNodes等三个函数

2.1doRefresh

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
    defer close(done)
    // Load nodes from the database and insert
    // them. This should yield a few previously seen nodes that are
    // (hopefully) still alive.
    tab.loadSeedNodes(true)
    // Run self lookup to discover new neighbor nodes.
    tab.lookup(tab.self.ID, false)
    // The Kademlia paper specifies that the bucket refresh should
    // perform a lookup in the least recently used bucket. We cannot
    // adhere to this because the findnode target is a 512bit value
    // (not hash-sized) and it is not easily possible to generate a
    // sha3 preimage that falls into a chosen bucket.
    // We perform a few lookups with a random target instead.
    for i := 0; i < 3; i++ {
        var target NodeID
        crand.Read(target[:])
        tab.lookup(target, false)
    }
}

主要调用三个方法,其中tab.loadSeedNodes,在之前已经分析,两外都调用了lookup方法,只是参数不同

// Run self lookup to discover new neighbor nodes.
    tab.lookup(tab.self.ID, false)

var target NodeID
        crand.Read(target[:])
        tab.lookup(target, false)

下面分析lookup方法

//loopup方法的目的是找到接近targetID的节点
//参数targetID不一定是一个真实存在的节点id
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
    var (
        target         = crypto.Keccak256Hash(targetID[:])
        asked          = make(map[NodeID]bool)
        seen           = make(map[NodeID]bool)
        reply          = make(chan []*Node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    asked[tab.self.ID] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
####
从ntab中获得接近target的节点,存入result中,最多bucketSize个
####
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.
        <-tab.refresh()
        refreshIfEmpty = false
    }
####
向results节点(接近target的节点)发出findnode消息
对返回的节点进行bond(ping pong)
####
    for {
        // ask the alpha closest nodes that we haven't asked yet
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++
                go func() {
                    // Find potential neighbors to bond with
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    reply <- tab.bondall(r)
                }()
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        for _, n := range <-reply {
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }
    return result.entries
}


####
从ntab中获得接近target的节点,最多bucketSize个
####
func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
    // This is a very wasteful way to find the closest nodes but
    // obviously correct. I believe that tree-based buckets would make
    // this easier to implement efficiently.
    close := &nodesByDistance{target: target}
    for _, b := range tab.buckets {
        for _, n := range b.entries {
            close.push(n, nresults)
        }
    }
    return close
}


// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
    entries []*Node
    target  common.Hash
}

// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
    ix := sort.Search(len(h.entries), func(i int) bool {
        return distcmp(h.target, h.entries[i].sha, n.sha) > 0
    })
    if len(h.entries) < maxElems {
        h.entries = append(h.entries, n)
    }
    if ix == len(h.entries) {
        // farther away than all nodes we already have.
        // if there was room for it, the node is now the last element.
    } else {
        // slide existing entries down to make room
        // this will overwrite the entry we just appended.
        copy(h.entries[ix+1:], h.entries[ix:])
        h.entries[ix] = n
    }
}

可知lookup的作用如下

然后可知doRefresh作用如下

上一篇下一篇

猜你喜欢

热点阅读