区块链技术

比特币源码分析-网络(二)

2018-03-09  本文已影响94人  wolf4j

众所周知,比特币网络是采用的P2P网络体系,所以,没有明显的客户端与服务端的区别或者是概念,每一个节点既是自身的客户端,又是其它节点的服务端。

sync.h中,定义了 CSemaphore,它包装了系统底层的信号量机制,对wait(), try_wait(),post()实现了封装,代码如下:

class CSemaphore {
private:
    boost::condition_variable condition;
    boost::mutex mutex;
    int value;
public:
    CSemaphore(int init) : value(init) {}
    void wait() {}
    bool try_wait() {}
    void post() {}
};

用于控制网络连接时的最大数量,每一个网络节点的最大连接数受限于信号量所允许的最大值。

下面我们按照一个网络连接从发送到接收到请求返回的这么个思路,来梳理代码逻辑。

004.png

CNode

CNode定义在bitcoin.cpp中,是比较重要的也是较为复杂的一个类,节点的所有信息都包含在内:

class CNode {
    SOCKET sock; //用来连接的socket句柄
    CDataStream vSend; //发送消息
    CDataStream vRecv; //接收消息
    uint32_t nHeaderStart; //头信息开始
    uint32_t nMessageStart; 
    int nVersion; //版本信息
    std::string strSubVer; 
    int nStartingHeight; //起始高度
    std::vector<CAddress> *vAddr; //ip地址(网络上节点的连接信息)
    int ban; 
    int64_t doneAfter; 
    CAddress you;
};

在上述定义中,最主要的是 std::vector<CAddress> *vAddr; 它包含了连接的所有节点,如果有节点连接进来,就加入到这个vector中;如果某个节点断开连接,就从这个vector中删除。

net.h中,对CNode进行了详细的定义(所有关于节点的信息,都进行了详细罗列),由于篇幅较长,只罗列其中的一些关键结构:

/** Information about a peer */
class CNode {
    friend class CConnman;

public:
    SOCKET hSocket; //连接的socket句柄
    size_t nSendSize; //所有vSendMsg条目的总大小。
    size_t nSendOffset; //已经发送的第一个vSendMsg内的偏移量。
    std::deque<std::vector<uint8_t>> vSendMsg;//发送消息的数组
     ...
    const CAddress addr;//节点地址信息
    std::atomic<int> nVersion;//版本信息
    CBloomFilter *pfilter;//海量过滤器
    const NodeId id;//节点ID

protected:
    mapMsgCmdSize mapSendBytesPerMsgCmd;
    mapMsgCmdSize mapRecvBytesPerMsgCmd;

public:
    uint256 hashContinue;
    std::atomic<int> nStartingHeight;
private:
    std::list<CNetMessage> vRecvMsg;//接收消息的数组
public:
    //用来解析接收到的消息数据
    bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool &complete);

    //用来设置接收版本
    void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; }
    int GetRecvVersion() { return nRecvVersion; }
    void SetSendVersion(int nVersionIn);
    int GetSendVersion() const;

    //用来发送地址
    void PushAddress(const CAddress &_addr, FastRandomContext &insecure_rand) {
        // Known checking here is only to save space from duplicates.
        // SendMessages will filter it again for knowns that were added
        // after addresses were pushed.
        if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) {
            if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) {
                vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] =
                    _addr;
            } else {
                vAddrToSend.push_back(_addr);
            }
        }
    }

    //用来发送inventory消息
    void PushInventory(const CInv &inv) {
        LOCK(cs_inventory);
        if (inv.type == MSG_TX) {
            if (!filterInventoryKnown.contains(inv.hash)) {
                setInventoryTxToSend.insert(inv.hash);
            }
        } else if (inv.type == MSG_BLOCK) {
            vInventoryBlockToSend.push_back(inv.hash);
        }
    }

    void PushBlockHash(const uint256 &hash) {
        LOCK(cs_inventory);
        vBlockHashesToAnnounce.push_back(hash);
    }
};

发送消息

CDataStream这个类主要是包装了一个带有双向缓冲区的接口, 它重载了 >> 和 <<,使用上述序列化读取和写入未格式化的数据模板,以线性时间填充数据;

class CDataStream {
protected:
    typedef CSerializeData vector_type;
    vector_type vch;
    unsigned int nReadPos;

    int nType;
    int nVersion;

public:
    template <typename T> CDataStream &operator<<(const T &obj) {
        // Serialize to this stream
        ::Serialize(*this, obj);
        return (*this);
    }

    template <typename T> CDataStream &operator>>(T &obj) {
        // Unserialize from this stream
        ::Unserialize(*this, obj);
        return (*this);
    }
    
    void read(char *pch, size_t nSize) {
        if (nSize == 0) {
            return;
        }

        // Read from the beginning of the buffer
        unsigned int nReadPosNext = nReadPos + nSize;
        if (nReadPosNext >= vch.size()) {
            if (nReadPosNext > vch.size()) {
                throw std::ios_base::failure(
                    "CDataStream::read(): end of data");
            }
            memcpy(pch, &vch[nReadPos], nSize);
            nReadPos = 0;
            vch.clear();
            return;
        }
        memcpy(pch, &vch[nReadPos], nSize);
        nReadPos = nReadPosNext;
    }

    void write(const char *pch, size_t nSize) {
        // Write to the end of the buffer
        vch.insert(vch.end(), pch, pch + nSize);
    }

所以,当我们需要发送消息时,首先会把数据放到CDataStream的数据流中,构造好完整的消息,但此时的消息格式是网络无法识别的,下一步,将构造好的消息放入到CSerializeData(类似一个消息队列)进行序列化,序列化之后,我们就可以把消息放到SocketSendData中发送出去。

CSerializeData 的格式如下:

// Byte-vector that clears its contents before deletion.
typedef std::vector<char, zero_after_free_allocator<char>> CSerializeData;

SocketSendData 的定义如下:

size_t CConnman::SocketSendData(CNode *pnode) const {
    AssertLockHeld(pnode->cs_vSend);
    size_t nSentSize = 0;
    size_t nMsgCount = 0;

    for (const auto &data : pnode->vSendMsg) {
        assert(data.size() > pnode->nSendOffset);
        int nBytes = 0;
         ...       
    }
    pnode->vSendMsg.erase(pnode->vSendMsg.begin(),
                          pnode->vSendMsg.begin() + nMsgCount);
    if (pnode->vSendMsg.empty()) {
        assert(pnode->nSendOffset == 0);
        assert(pnode->nSendSize == 0);
    }
    return nSentSize;
}

接收消息

接收消息的工作,主要是由 ThreadSocketHandler 来完成的,

if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) {
     pnode->CloseSocketDisconnect();
}

随后,通过 ReceiveMsgBytes 把从其它节点接收到的数据解析为单个数据,然后放回到消息队列,最后由ThreadMessageHandler来进行最后的处理。

ReceiveMsgBytes解析数据的主要流程如下,调用的是CNetMessage下的readHeader和readData方法,随后,使用complete()进行一次判定,看解析是否完成:

// Absorb network data.
        int handled;
        if (!msg.in_data) {
            handled = msg.readHeader(pch, nBytes);
        } else {
            handled = msg.readData(pch, nBytes);
        }

readHeader

readHeader 主要用来解析消息头,由上一篇文章我们能够知道,一个消息头,至少24字节,如果小于24字节直接退出,如果满足这个条件,先把接收到的数据的开始部分复制到消息头数据流中(hdrbuf),再反格式化成消息头(hdr)。消息数据最大为MAX_SIZE(0x02000000),如果大于这个值,证明出错,直接退出。

int CNetMessage::readHeader(const char *pch, unsigned int nBytes) {
    // copy data to temporary parsing buffer
    unsigned int nRemaining = 24 - nHdrPos;
    unsigned int nCopy = std::min(nRemaining, nBytes);

    memcpy(&hdrbuf[nHdrPos], pch, nCopy);
    nHdrPos += nCopy;

    // if header incomplete, exit
    if (nHdrPos < 24) {
        return nCopy;
    }

    // deserialize to CMessageHeader
    try {
        hdrbuf >> hdr;
    } catch (const std::exception &) {
        return -1;
    }

    // reject messages larger than MAX_SIZE
    if (hdr.nMessageSize > MAX_SIZE) {
        return -1;
    }

    // switch state to reading message data
    in_data = true;

    return nCopy;
}

readData

readData 主要用来解析消息体,消息的数据部分复制到消息数据流中(vRecv)来处理,如果 vRecv 的空间不够,会进行扩容,但最多分配256 KB,不能超过总消息大小。

int CNetMessage::readData(const char *pch, unsigned int nBytes) {
    unsigned int nRemaining = hdr.nMessageSize - nDataPos;
    unsigned int nCopy = std::min(nRemaining, nBytes);

    if (vRecv.size() < nDataPos + nCopy) {
        // Allocate up to 256 KiB ahead, but never more than the total message
        // size.
        vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
    }

    hasher.Write((const uint8_t *)pch, nCopy);
    memcpy(&vRecv[nDataPos], pch, nCopy);
    nDataPos += nCopy;

    return nCopy;
}

缓冲区

在 net.h 文件中,我们能够看到如下定义:

//接收消息缓冲区
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
//发送消息缓冲区
static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;

我们将接收或者发送的数据放入到缓冲区,我们可以通过如下函数,分别对他们调用,加速我们的处理过程:

unsigned int CConnman::GetReceiveFloodSize() const {
    return nReceiveFloodSize;
}
unsigned int CConnman::GetSendBufferSize() const {
    return nSendBufferMaxSize;
}
上一篇下一篇

猜你喜欢

热点阅读