C++ 高性能服务器网络框架设计细节(下)
GitChat 作者:范蠡
原文:C++ 高性能服务器网络框架设计细节
导读:该文建议精度
目录
C++ 高性能服务器网络框架设计细节(上)
C++ 高性能服务器网络框架设计细节(下)
二、服务器程序结构的组织
上面的六个标题,我们讨论了很多具体的细节问题,现在是时候讨论将这些细节组织起来了。根据我的个人经验,目前主流的思想是one thread one loop+reactor模式(也有proactor模式)的策略。通俗点说就是一个线程一个循环,即在一个线程的函数里面不断地循环依次做一些事情,这些事情包括检测网络事件、解包数据产生业务逻辑。我们先从最简单地来说,设定一些线程在一个循环里面做网络通信相关的事情,伪码如下:
while(退出标志)
{
//IO复用技术检测socket可读事件、出错事件
//(如果有数据要发送,则也检测可写事件)
//如果有可读事件,对于侦听socket则接收新连接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,如果出错则关闭连接;
//如果有数据要发送,有可写事件,则发送数据
//如果有出错事件,关闭该连接
}
另外设定一些线程去处理接收到的数据,并解包处理业务逻辑,这些线程可以认为是业务线程了,伪码如下:
//从接收缓冲区中取出数据解包,分解成不同的业务来处理
上面的结构是目前最通用的服务器逻辑结构,但是能不能再简化一下或者说再综合一下呢?我们试试,你想过这样的问题没有:假如现在的机器有两个cpu(准确的来说应该是两个核),我们的网络线程数量是2个,业务逻辑线程也是2个,这样可能存在的情况就是:业务线程运行的时候,网络线程并没有运行,它们必须等待,如果是这样的话,干嘛要多建两个线程呢?除了程序结构上可能稍微清楚一点,对程序性能没有任何实质性提高,而且白白浪费cpu时间片在线程上下文切换上。所以,我们可以将网络线程与业务逻辑线程合并,合并后的伪码看起来是这样子的:
while(退出标志)
{
//IO复用技术检测socket可读事件、出错事件
//(如果有数据要发送,则也检测可写事件)
//如果有可读事件,对于侦听socket则接收新连接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,如果出错则关闭连接;
//如果有数据要发送,有可写事件,则发送数据
//如果有出错事件,关闭该连接
//从接收缓冲区中取出数据解包,分解成不同的业务来处理
}
你没看错,其实就是简单的合并,合并之后和不仅可以达到原来合并前的效果,而且在没有网络IO事件的时候,可以及时处理我们想处理的一些业务逻辑,并且减少了不必要的线程上下文切换时间。
我们再更进一步,甚至我们可以在这个while循环增加其它的一些任务的处理,比如程序的逻辑任务队列、定时器事件等等,伪码如下:
while(退出标志)
{
//定时器事件处理
//IO复用技术检测socket可读事件、出错事件
//(如果有数据要发送,则也检测可写事件)
//如果有可读事件,对于侦听socket则接收新连接;
//对于普通socket则收取该socket上的数据,收取的数据存入对应的接收缓冲区,如果出错则关闭连接;
//如果有数据要发送,有可写事件,则发送数据
//如果有出错事件,关闭该连接
//从接收缓冲区中取出数据解包,分解成不同的业务来处理
//程序自定义任务1
//程序自定义任务2
}
注意:之所以将定时器事件的处理放在网络IO事件的检测之前,是因为避免定时器事件过期时间太长。假如放在后面的话,可能前面的处理耗费了一点时间,等到处理定时器事件时,时间间隔已经过去了不少时间。虽然这样处理,也没法保证定时器事件百分百精确,但是能尽量保证。当然linux系统下提供eventfd这样的定时器对象,所有的定时器对象就能像处理socket这样的fd一样统一成处理。这也是网络库libevent的思想很像,libevent将socket、定时器、信号封装成统一的对象进行处理。
说了这么多理论性的东西,我们来一款流行的开源网络库muduo来说明吧(作者:陈硕),原库是基于boost的,我改成了C++11的版本,并修改了一些bug,在此感谢原作者陈硕。
上文介绍的核心线程函数的while循环位于eventloop.cpp中:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
//当是侦听socket时,readCallback_指向Acceptor::handleRead
//当是客户端socket时,调用TcpConnection::handleRead
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
//如果是连接状态服的socket,则writeCallback_指向Connector::handleWrite()
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
当然,这里利用了Channel对象的“多态性”,如果是普通socket,可读事件就会调用预先设置的回调函数;但是如果是侦听socket,则调用Aceptor对象的handleRead()
来接收新连接:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
//newConnectionCallback_实际指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of livev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
主循环里面的业务逻辑处理对应:
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
而frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。
我们看一下数据收取的逻辑:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
//messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime)
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
将收到的数据放到接收缓冲区里面,将来我们来解包:
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
{
while (true)
{
//不够一个包头大小
if (pBuffer->readableBytes() < (size_t)sizeof(msg))
{
LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
return;
}
//不够一个整包大小
msg header;
memcpy(&header, pBuffer->peek(), sizeof(msg));
if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
return;
pBuffer->retrieve(sizeof(msg));
std::string inbuf;
inbuf.append(pBuffer->peek(), header.packagesize);
pBuffer->retrieve(header.packagesize);
if (!Process(conn, inbuf.c_str(), inbuf.length()))
{
LOG_WARN << "Process error, close TcpConnection";
conn->forceClose();
}
}// end while-loop
}
先判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。
再看看发送数据的逻辑:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。
很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。
总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统服务器项目代码:
void CEventDispatcher::Run()
{
m_bShouldRun = true;
while(m_bShouldRun)
{
DispatchIOs();
SyncTime();
CheckTimer();
DispatchEvents();
}
}
void CEpollReactor::DispatchIOs()
{
DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
if (HandleOtherTask())
{
dwSelectTimeOut = 0;
}
struct epoll_event ev;
CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
for(; itor!=m_mapEventHandlerId.end(); itor++)
{
CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;
if(pEventHandler == NULL){
continue;
}
ev.data.ptr = pEventHandler;
ev.events = 0;
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nReadID > 0)
{
ev.events |= EPOLLIN;
}
if (nWriteID > 0)
{
ev.events |= EPOLLOUT;
}
epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
}
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);
for (int i=0; i<nfds; i++)
{
struct epoll_event &evref = events[i];
CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleInput();
}
if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleOutput();
}
}
}
void CEventDispatcher::DispatchEvents()
{
CEvent event;
CSyncEvent *pSyncEvent;
while(m_queueEvent.PeekEvent(event))
{
int nRetval;
if(event.pEventHandler != NULL)
{
nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
else
{
nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
if(event.pAdd != NULL) //同步消息
{
pSyncEvent=(CSyncEvent *)event.pAdd;
pSyncEvent->nRetval = nRetval;
pSyncEvent->sem.UnLock();
}
}
}
再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)
{
Sleep(MIN_TIMER_DURATION);
continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnRead();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnWrite();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnClose();
pSocket->ReleaseRef();
}
}
}
}
再看filezilla,一款ftp工具的服务器端,它采用的是Windows的WSAAsyncSelect模型代码下载地址
//Processes event notifications sent by the sockets or the layers
static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
if (message>=WM_SOCKETEX_NOTIFY)
{
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage
{
//Lookup socket and verify if it's valid
CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket;
SOCKET hSocket = wParam;
if (!pSocket)
return 0;
if (hSocket == INVALID_SOCKET)
return 0;
if (pSocket->m_SocketData.hSocket != hSocket)
return 0;
int nEvent = lParam & 0xFFFF;
int nErrorCode = lParam >> 16;
//Dispatch notification
if (!pSocket->m_pFirstLayer) {
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
break;
#endif //NOSOCKETSTATES
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ) {
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
{
if (nErrorCode && pSocket->m_SocketData.nextAddr)
{
if (pSocket->TryNextProtocol())
break;
}
pSocket->SetState(connected);
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != listening && pSocket->GetState() != attached)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_ACCEPT)
pSocket->OnAccept(nErrorCode);
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != connected && pSocket->GetState() != attached)
break;
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
pSocket->m_SocketData.onCloseCalled = true;
pSocket->OnReceive(WSAESHUTDOWN);
break;
}
}
pSocket->SetState(nErrorCode ? aborted : closed);
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
break;
}
}
else //Dispatch notification to the lowest layer
{
if (nEvent == FD_READ)
{
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
return 0;
DWORD nBytes;
if (!pSocket->IOCtl(FIONREAD, &nBytes))
nErrorCode = WSAGetLastError();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (nEvent == FD_CLOSE)
{
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(FD_READ, 0);
return 0;
}
}
pSocket->m_SocketData.onCloseCalled = true;
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
}
return 0;
}
else if (message == WM_USER) //Notification event sent by a layer
{
//Verify parameters, lookup socket and notification message
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
{
return 0;
}
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;
if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket)
{
delete pMsg;
return 0;
}
int nEvent=pMsg->lEvent&0xFFFF;
int nErrorCode=pMsg->lEvent>>16;
//Dispatch to layer
if (pMsg->pLayer)
pMsg->pLayer->CallEvent(nEvent, nErrorCode);
else
{
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
pSocket->SetState(connected);
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))
pSocket->OnReceive(0);
if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))
pSocket->OnReceive(0);
if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE))
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif //NOSOCKETSTATES
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT))
#endif //NOSOCKETSTATES
{
pSocket->OnAccept(nErrorCode);
}
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE))
{
pSocket->SetState(nErrorCode?aborted:closed);
#else
{
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
}
break;
}
}
delete pMsg;
return 0;
}
else if (message == WM_USER+1)
{
// WSAAsyncGetHostByName reply
// Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
CAsyncSocketEx *pSocket = NULL;
for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) {
pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket;
if (pSocket && pSocket->m_hAsyncGetHostByNameHandle &&
pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam &&
pSocket->m_pAsyncGetHostByNameBuffer)
break;
}
if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer)
return 0;
int nErrorCode = lParam >> 16;
if (nErrorCode) {
pSocket->OnConnect(nErrorCode);
return 0;
}
SOCKADDR_IN sockAddr{};
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr;
sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort);
BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr));
delete [] pSocket->m_pAsyncGetHostByNameBuffer;
pSocket->m_pAsyncGetHostByNameBuffer = 0;
pSocket->m_hAsyncGetHostByNameHandle = 0;
if (!res)
if (GetLastError() != WSAEWOULDBLOCK)
pSocket->OnConnect(GetLastError());
return 0;
}
else if (message == WM_USER + 2)
{
//Verify parameters, lookup socket and notification message
//Verify parameters
if (!hWnd)
return 0;
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
return 0;
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
if (!pSocket)
return 0;
// Process pending callbacks
std::list<t_callbackMsg> tmp;
tmp.swap(pSocket->m_pendingCallbacks);
pSocket->OnLayerCallback(tmp);
for (auto & cb : tmp) {
delete [] cb.str;
}
}
else if (message == WM_TIMER)
{
if (wParam != 1)
return 0;
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd && pWnd->m_pThreadData);
if (!pWnd || !pWnd->m_pThreadData)
return 0;
if (pWnd->m_pThreadData->layerCloseNotify.empty())
{
KillTimer(hWnd, 1);
return 0;
}
CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front();
pWnd->m_pThreadData->layerCloseNotify.pop_front();
if (pWnd->m_pThreadData->layerCloseNotify.empty())
KillTimer(hWnd, 1);
if (socket)
PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE);
return 0;
}
return DefWindowProc(hWnd, message, wParam, lParam);
}
上面截取的代码段,如果你对这些项目不是很熟悉的话,估计你也没有任何兴趣去细细看每一行代码逻辑。但是你一定要明白我所说的这个结构的逻辑,基本上目前主流的网络框架都是这套原理。比如filezilla的网络通信层同样也被用在大名鼎鼎的电驴(easyMule)中。
关于单个服务程序的框架,我已经介绍完了,如果你能完全理解我要表达的意思,我相信你也能构建出一套高性能服务程序来。
另外,服务器框架也可以在上面的设计思路的基础上增加很多有意思的细节,比如流量控制。举另外 一个我实际做过的项目中的例子吧:
一般实际项目中,当客户端连接数目比较多的时候,服务器在处理网络数据的时候,如果同时有多个socket上有数据要处理,由于cpu核数有限,根据上面先检测iO事件再处理IO事件可能会出现工作线程一直处理前几个socket的事件,直到前几个socket处理完毕后再处理后面几个socket的数据。这就相当于,你去饭店吃饭,大家都点了菜,但是有些桌子上一直在上菜,而有些桌子上一直没有菜。这样肯定不好,我们来看下如何避免这种现象:
int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)
{
//NET_IO_LOG0("CFtdEngine::HandlePackage\n");
FTDC_PACKAGE_DEBUG(pFTDCPackage);
if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin)
{
if (!IsSessionLogin(pSession->GetSessionID()))
{
SendErrorRsp(pFTDCPackage, pSession, 1, "客户未登录");
return 0;
}
}
CalcFlux(pSession, pFTDCPackage->Length()); //统计流量
REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "登录请求%0x", pFTDCPackage->GetTID());
int nRet = 0;
switch(pFTDCPackage->GetTID())
{
case FTD_TID_ReqUserLogin:
///huwp:20070608:检查过高版本的API将被禁止登录
if (pFTDCPackage->GetVersion()>FTD_VERSION)
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");
return 0;
}
nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqCheckUserLogin:
nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqSubscribeTopic:
nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
}
return 0;
}
当有某个socket上有数据可读时,接着接收该socket上的数据,对接收到的数据进行解包,然后调用CalcFlux(pSession, pFTDCPackage->Length())进行流量统计:
void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)
{
TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());
if (pSessionInfo != NULL)
{
//流量控制改为计数
pSessionInfo->nCommFlux ++;
///若流量超过规定,则挂起该会话的读操作
if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)
{
pSession->SuspendRead(true);
}
}
}
该函数会先让某个连接会话(Session)处理的包数量递增,接着判断是否超过最大包数量,则设置读挂起标志:
void CSession::SuspendRead(bool bSuspend)
{
m_bSuspendRead = bSuspend;
}
这样下次将会从检测的socket列表中排除该socket:
void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)
{
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nWriteID != 0 && nReadID ==0)
{
nReadID = nWriteID;
}
if (nReadID != 0)
{
m_mapEventHandlerId[pEventHandler] = nReadID;
struct epoll_event ev;
ev.data.ptr = pEventHandler;
if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0)
{
perror("epoll_ctl EPOLL_CTL_ADD");
}
}
}
void CSession::GetIds(int *pReadId, int *pWriteId)
{
m_pChannelProtocol->GetIds(pReadId,pWriteId);
if (m_bSuspendRead)
{
*pReadId = 0;
}
}
也就是说不再检测该socket上是否有数据可读。然后在定时器里1秒后重置该标志,这样这个socket上有数据的话又可以重新检测到了:
const int SESSION_CHECK_TIMER_ID = 9;
const int SESSION_CHECK_INTERVAL = 1000;
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);
void CFrontEngine::OnTimer(int nIDEvent)
{
if (nIDEvent == SESSION_CHECK_TIMER_ID)
{
CSessionMap::iterator itor = m_mapSession.Begin();
while (!itor.IsEnd())
{
TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());
if (pFind != NULL)
{
CheckSession(*itor, pFind);
}
itor++;
}
}
}
void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo)
{
///重新开始计算流量
pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;
if (pSessionInfo->nCommFlux < 0)
{
pSessionInfo->nCommFlux = 0;
}
///若流量超过规定,则挂起该会话的读操作
pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);
}
这就相当与饭店里面先给某一桌客人上一些菜,让他们先吃着,等上了一些菜之后不会再给这桌继续上菜了,而是给其它空桌上菜,大家都吃上后,继续回来给原先的桌子继续上菜。实际上我们的饭店都是这么做的。上面的例子是单服务流量控制的实现的一个非常好的思路,它保证了每个客户端都能均衡地得到服务,而不是一些客户端等很久才有响应。当然,这样的技术不能适用于有顺序要求的业务,例如销售系统,这些系统一般是先下单先得到的。
另外现在的服务器为了加快IO操作,大量使用缓存技术,缓存实际上是以空间换取时间的策略。对于一些反复使用的,但是不经常改变的信息,如果从原始地点加载这些信息就比较耗时的数据(比如从磁盘中、从数据库中),我们就可以使用缓存。所以时下像redis、leveldb、fastdb等各种内存数据库大行其道。如果你要从事服务器开发,你至少需要掌握它们中的几种。
这是我在gitchat上的首篇文章,限于篇幅有限,很多细节不可能展开来叙述,同时这里就不再讲述分布式的服务器的设计技巧了,后面如果条件允许会给大家带来更多的技术分享。同时感谢gitchat提供这样一个与大家交流的平台。
鉴于笔者能力和经验有限,文中难免有错漏之处,欢迎提意见。