btcd的p2p网络(2)-连接ConnMgr

2019-01-25  本文已影响7人  豆瓣奶茶

p2p网络从底层到上层可以分为3层,地址 连接 节点,每一层都有自己的功能
声明:文章代码和源码有不一致地方
这篇文章简单记录下连接conn

三个主要的结构体

1、连接管理

// ConnManager providers a manager to handle network connections.
type ConnManager struct {
    // the following variables must only be used atomically
    // 记录自己主动连接其他节点的连接数量
    connReqCount uint64
    // 标识connmgr已经启动
    start int32
    // 标识connmgr已经结束
    stop int32

    // 设定相关的配置
    cfg Config
    // 用于同步connmgr的退出状态,调用方可以阻塞等待connmgr的工作协程退出
    wg sync.WaitGroup
    // 某个连接失败后,connmgr尝试选择新的peer地址连接的总次数
    failedAttempts uint64
    // 用于与connmgr工作协程通信的管道
    requests chan interface{}
    // 用于通知工作协程退出
    quit chan struct{}
}

2、Config,配置参数
其实就是connmgr配置,本身就是connmgr的一个字段。

// Config holds the configuration options related to the connection manager.
type Config struct {
    // Listeners define a slice of listeners for which the connection manager
    // will take ownership of(取得所有权) and accept connections. when a connection
    // is accepted,the OnAccept handler will be invoked with the connection. since
    // the connection manager takes ownership of these listeners,they will be closed
    // when the connection manager is stoped.

    // this field will not have any effect if the onAccept field is not also specified.
    // It may be nil if the caller does not wish to listen for
    // incoming connection

    Listeners []net.Listener //节点上所有等待外部连接的监听点;
    // OnAccept is a callback that is fired when an inbound connection is
    // accepted.  It is the caller's responsibility(责任、义务) to close the connection.
    // Failure to close the connection will result in the connection manager
    // believing the connection is still active and thus have undesirable
    // side effects such as still counting toward maximum connection limits.
    //
    // This field will not have any effect if the Listeners field is not
    // also specified since there couldn't possibly be any accepted
    // connections in that case.
    OnAccept func(net.Conn) // 节点应答并接受外部连接后的回调函数
    // TargetOutbound is the number of outbound network connections to
    // maintain. Defaults to 8.
    TargetOutbound uint32 // 节点主动向外连接peer的最大个数
    // RetryDuration is the duration to wait before retrying connection
    // requests. Defaults to 5s.
    RetryDuration time.Duration // 连接失败后发起重连的等待时间
    // OnConnection is a callback that is fired when a new outbound
    // connection is established.
    OnConnection func(*ConnReq, net.Conn) // 连接建立成功后的回调函数
    // OnDisconnection is a callback that is fired when an outbound
    // connection is disconnected.
    OnDisconnection func(*ConnReq) // 连接关闭后的回调函数
    // GetNewAddress is a way to get an address to make a network connection
    // to.  If nil, no new connections will be made automatically.
    // 连接失败后,ConnMgr可能会选择新的peer地址进行连接
    // GetNewAddress函数提供了获取新的peer地址的方法,它最终会调用addrManager中
    // 的GetAddress()来分配新地址。
    GetNewAddress func() (net.Addr, error)
    // Dial connects to the address on the named network.It cannot be nil.
    // 定义建立TCP连接的方式,是直接连接还是通过代理连接。
    Dial func(net.Addr) (net.Conn, error)
}

3、ConnReq 描述了一个连接

// ConnReq is the connection request to a network address. If permanent, the
// connection will be retried on disconnection.
// ConnReq 描述了一个连接
type ConnReq struct {
    // The following variables must only be used atomically.
    // 连接的序号,用于索引
    id uint64
    // 连接的目的地址
    Addr      net.Addr
    // 标识是否与Peer保持永久连接,如果为true,
    // 则连接失败后,继续尝试与该Peer连接,而不是选择新的Peer地址重新连接
    Permanent bool
    // 连接成功后,真实的net.Conn对象;
    conn       net.Conn
    // 连接的状态,有ConnPending、ConnEstablished、ConnDisconnected及ConnFailed等;
    state      ConnState
    // stateMtx: 保护state状态的读写锁;
    stateMtx   sync.RWMutex
    //retryCount: 如果Permanent为true,retryCount记录该连接重复重连的次数;
    retryCount uint32
}

结合起来说,就是连接管理器connmgr按照自身的配置config,管理着一些连接connReq

启动ConnMgr

我们先看start()函数

// Start: launches(发起、发动)the connection manager and begins conecting to the network.
func (cm *ConnManager) Start() {
    // already started ?
    if atomic.AddInt32(&cm.start, 1) != 1 {
        return
    }
    log.Trace("Connection manager started")
    cm.wg.Add(1)
    // 启动工作协程
    go cm.connHandler()
    // Start all the listeners so long as the caller requested
    // them and provided a callback to be invoked when connections are accepted.
    if cm.cfg.OnAccept != nil {
        for _, listenr := range cm.cfg.Listeners {
            cm.wg.Add(1)
            // 启动监听协程listenHandler,等待其他节点连接;
            go cm.listenHandler(listenr)
        }
    }
    // 启动建立连接的协程,选择Peer地址并主动连接;
    for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
        go cm.NewConnReq()
    }
}

主要是启动工作协程cm.connHandler(),
然后一方面监听其他节点的连接,go cm.listenHandler(listenr)这里面做的事情就是我们普通的tcp地址监听。
一方面主动去连接其他的节点。
主动连接其他节点cm.NewConnReq()

// NewConnReq creates a new connection request and connects to the
// corresponding(对应的) address.
// 创建一个连接请求,然后连接对应的地址
func (cm *ConnManager) NewConnReq() {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }
    if cm.cfg.GetNewAddress == nil {
        return
    }
    c := &ConnReq{}
    atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
    // Submit a request of a pending connection attempt to the connection
    // manager. By registering the id before the connection is even established,
    // we'll be able to later cancel the connection via the Remove method.
    done := make(chan struct{})
    select {
    case cm.requests <- registerPending{c, done}:
    case <-cm.quit:
        return
    }

    // wait for the registration to successfully add the pending conn req
    // to the conn manager's internal state.
    select {
    case <-done:
    case <-cm.quit:
        return
    }
    addr,err := cm.cfg.GetNewAddress()
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }
    c.Addr = addr
    cm.Connect(c)
}

首先构造一个ConnReq,c := &ConnReq{},然后生成registerPending{c, done},
registerPending写入到connmgr的通道case cm.requests <- registerPending{c, done}

这里的registerPending结构体中还有一个通道done,cm.requests这个通道肯定有人会从里面读数据,处理完后会通过通道done返回信息。下面的case <-done:就是在等待返回的信息。
谁在通道的另外一头读呢?go cm.connHandler(),下面这个图就是他们工作概况

然后cm.cfg.GetNewAddress()得到一个连接的地址(这里用到了addrMgr),然后连接连接cm.Connect(c)

// Connection assigns an id and dials a connection to the address of the connection request
func (cm *ConnManager) Connect(c *ConnReq){
    if atomic.LoadInt32(&cm.stop) != 0{
        return
    }
    // TODO 再次检查一遍,相当于重复了NewConnReq()的工作
    log.Debugf("Attempting to connect to %v", c)
    conn,err := cm.cfg.Dial(c.Addr)
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }

    select {
    case cm.requests <- handleConnected{c, conn}:
    case <-cm.quit:
    }
}

连接主要就是这句代码conn,err := cm.cfg.Dial(c.Addr),这个Dial就是在普通的tcp连接外包了一层,让我们有个选择,比如可以通过代理进行连接。

不论是连接成功还是失败,handleConnected{c, conn}:handleFailed{c, err}:这两个结构体都被构建,并且发送到cm.requests

有连接就有断开

func (cm *ConnManager) Disconnect(id uint64) {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }

    select {
    case cm.requests <- handleDisconnected{id, true}:
    case <-cm.quit:
    }
}

connect也差不多,都是向cm.requests发了一个请求。

看来,连接或者断开连接的主要处理逻辑在connHandler中,我们来看看它的实现:

// connHandler handles all connection related requests.  It must be run as a
// goroutine.
//
// The connection handler makes sure that we maintain a pool of active outbound
// connections so that we remain connected to the network.  Connection requests
// are processed and mapped by their assigned ids.
func (cm *ConnManager) connHandler() {
    // pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map时,size可以省略,当你知道大小时,最好加上

out:
    for {
        select {
        case req := <- cm.requests:
            switch msg:=req.(type) {
            case registerPending:
                // TODO
            case handleConnected:
                connReq := msg.c

                if _, ok := pending[connReq.id]; !ok {
                    if msg.conn != nil {
                        msg.conn.Close()
                    }
                    log.Debugf("Ignoring connection for "+
                        "canceled connreq=%v", connReq)
                    continue
                }

                connReq.updateState(ConnEstablished)
                connReq.conn = msg.conn
                conns[connReq.id] = connReq
                log.Debugf("Connected to %v", connReq)
                connReq.retryCount = 0
                cm.failedAttempts = 0

                delete(pending, connReq.id)

                if cm.cfg.OnConnection != nil {
                    go cm.cfg.OnConnection(connReq, msg.conn)
                }
            case handleDisconnected:
                // TODO
            case handleFailed:
                // TODO
            }

        case <-cm.quit:
            break out
        }
    }
    cm.wg.Done()
    log.Trace("Connection handler done")
}

在这里不停的处理cm.requests通道中的信息。我们看下连接成功的处理
起初创建了两个变量

// pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map时,size可以省略,当你知道大小时,最好加上

连接成功后

  1. map``pending中找有没有这个连接请求,如果没有则表明这不要我们要的连接。断开
  2. 更新connReq的状态,然后添加到map conns
  3. 调用go cm.cfg.OnConnection(connReq, msg.conn)

两个peer之间的连接conn,还需要考虑其他的很多方面。但是还好,到现在我们至少可以简单的创建一个连接了。
至于cm.cfg.OnConnection()要干什么,我们后面再分析了。


参考
https://www.jianshu.com/p/d6484e5710ad

上一篇下一篇

猜你喜欢

热点阅读