IM客户端开发(3)——接收线程
2019-12-06 本文已影响0人
Magic11
1、 接收线程负责发送心跳、读取消息、检查已发送的消息是否超时、检查网络是否还活着等
void GSKCNet::recvLoop()
{
SetRecvLoop(true);
SetExitRecvLoop(false);
while (GetRecvLoop())
{
//满足条件就发送心跳
sendHeartMsg();
//检查网络上是否有消息可读
readMsg();
//处理超时消息
handleTimeoutMsg();
//检查网络是否活着
checkNetActive();
}
//等待发送线程退出(这样才能正确处理所有请求回调)最多200ms
int num = 0;
while (!GetExitSendLoop())
{
glodon::sleep(10);
++num;
if (num > 20)
{
break;
}
}
ClearConext();
SetLastActiveTime(0);
m_iLastSendHeartTime = 0;
SetExitRecvLoop(true);
}
心跳发送的逻辑如下:
当发送队列为空且100秒内没有任何消息收发成功时,发送心跳
void GSKCNet::sendHeartMsg()
{
//当前时间秒数
time_t now = time(NULL);
//心跳开始、发送对列为空、100s没有任何消息收发(成功)
if (SendQueueIsEmpty() && m_bHeartSendStart
&& (now - GetLastActiveTime() >= m_iHeartInterval)
&& (now - m_iLastSendHeartTime >= m_iHeartInterval))
{
m_iLastSendHeartTime = now;
GSKPacket *pPacket = createPacket(CMD_HEART,0,"");
pPacket->setBody("");
GSKRequest* pRequest = new GSKRequest(pPacket, 1000,g_heartCallback);
PushToSendQueue(pRequest); //将心跳请求放到发送队列
}
}
接收消息的逻辑如下:
1) 如果是服务器发来的消息(聊天消息、透传协议),直接放到回调队列queue<GSKResponse*> m_hasCallbackMap
2) 如果是登录应答,启动心跳
3) 如果是登出应答,停止心跳
4) 如果是心跳应答,不做任何处理
5) 如果是发送请求应答,检查已发送消息的map m_hasSendedMap
代码逻辑如下:
void GSKCNet::readMsg()
{
//检查网络上是否有数据可以读取
int iSocketStatus = m_pGSKSocket->Select();
if (iSocketStatus == GSK_SELECT_ERROR)
{
m_iNetStatus = GSKNET_STATUS_DISCONNECT;
}
else if (iSocketStatus == GSK_SELECT_READ)
{
m_iNetStatus = GSKNET_STATUS_OK;
GSKResponse *stGSKResponse = new GSKResponse();
int iRet = HandleResponse(stGSKResponse);
if(iRet == 0)
{
//处理服务器发来的消息
SetLastActiveTime(time(NULL));
if(stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CHATMSG ||
stGSKResponse->stHead.wCmd==SUBCMD_PUSH_SYSTEM_PROTOCOL ||
stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CUSTOM_PROYOCOL)
{
stGSKResponse->code = iRet;
stGSKResponse->cb = g_pushCallback;
PushToCallbackQueue(stGSKResponse);
return;
}
else if(stGSKResponse->stHead.wCmd == CMD_HEART_ANSWER)
{
//do nothing
}
else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_SUCCEED)
{
StartHeartBeat();
SetIsLogged(true);
}
else if (stGSKResponse->stHead.wCmd == SUBCMD_RCODE_LOGIN_ANSWER_SUCCEED)
{
SetIsLogged(true);
}
else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_FAILED)
{
StopHeartBeat();
}
else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGOUT_ANSWER)
{
StopHeartBeat();
}
updateSendedMap(stGSKResponse);
}
}
}
读取并解析一条消息的逻辑如下:
int GSKCNet::HandleResponse(GSKResponse* pGSKResponse)
{
//获取可读字节数
int readedBytes = m_recvBuffer.readableBytes();
//获取开始标志和命令字
if(readedBytes <= 3)
{
char *pBuff = nullptr;
if (readedBytes == 3)
{
//开始位校验
if (m_recvBuffer.peekInt8() != STX)
{
m_recvBuffer.retrieveAll();
return -2;
}
//心跳应答校验
if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
{
//校验结束标志
pBuff = (char*)malloc(1);
int iRet = m_pGSKSocket->Recv(pBuff, 1);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0)
{
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
if (m_recvBuffer.peekHeartBeatEnd() != ETX)
{
m_recvBuffer.retrieveAll();
free(pBuff);
return -3;
}
free(pBuff);
m_recvBuffer.retrieveAll();
pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
pGSKResponse->stHead.subCmd = 0;
pGSKResponse->stHead.wClt = 3;
pGSKResponse->stHead.seqid = 0;
pGSKResponse->SetSequence(0);
pGSKResponse->SetMsg("");
return 0;
} else {
//获取总长度
pBuff = (char*)malloc(4);
int iRet = m_pGSKSocket->Recv(pBuff, 4);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0)
{
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
}
} else {
//获取完整的前三个字节
pBuff = (char*)malloc(3 - readedBytes);
int iRet = m_pGSKSocket->Recv(pBuff, 3 - readedBytes);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0){
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
if (m_recvBuffer.peekInt8() != STX)
{
m_recvBuffer.retrieveAll();
free(pBuff);
return -2;
}
free(pBuff);
pBuff = nullptr;
if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
{
pBuff = (char*)malloc(1);
int iRet = m_pGSKSocket->Recv(pBuff, 1);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0)
{
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
if (m_recvBuffer.peekHeartBeatEnd() != ETX)
{
m_recvBuffer.retrieveAll();
free(pBuff);
return -3;
}
free(pBuff);
m_recvBuffer.retrieveAll();
pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
pGSKResponse->stHead.subCmd = 0;
pGSKResponse->stHead.wClt = 3;
pGSKResponse->stHead.seqid = 0;
pGSKResponse->SetSequence(0);
pGSKResponse->SetMsg("");
return 0;
} else {
pBuff = (char*)malloc(4);
int iRet = m_pGSKSocket->Recv(pBuff, 4);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0)
{
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
}
}
if(m_recvBuffer.readableBytes() < 7)
{
free(pBuff);
return -3;
}
free(pBuff);
//消息长度
int msgLen = m_recvBuffer.peekMsgLength();
if( msgLen <= 0 /*|| msgLen > 2048000 */)
{
m_recvBuffer.retrieveAll();
return -4;
}
else if(msgLen > (int)m_recvBuffer.readableBytes())
{
if(HandleResp(msgLen,pGSKResponse) < 0)
{
return -5;
}
}
}else {
if (readedBytes < 7)
{
char *pBuff = (char*)malloc(7 - readedBytes);
int iRet = m_pGSKSocket->Recv(pBuff, 7 - readedBytes);
if (iRet < 0)
{
handleRelogin(iRet);
free(pBuff);
return -1;
}
else if (iRet == 0)
{
free(pBuff);
return -1;
}
m_recvBuffer.append(pBuff, iRet);
free(pBuff);
}
//消息长度
int msgLen = m_recvBuffer.peekMsgLength();
if (msgLen - (int)m_recvBuffer.readableBytes() < 0)
{
m_recvBuffer.retrieveAll();
return -6;
}
if (HandleResp(msgLen, pGSKResponse) < 0)
{
return -7;
}
}
m_recvBuffer.retrieveAll();
return 0;
}
读取消息到缓冲区的过程如下:
int GSKSocket::Recv( char* szBuf, int iBufLen , int iTimeout )
{
if( IsConnected() )
{
fd_set stFdSet;
FD_ZERO(&stFdSet);
FD_SET(m_iSocket, &stFdSet);
struct timeval stTimeout;
stTimeout.tv_sec = iTimeout;
stTimeout.tv_usec = 0;
int iRet = 0;
int iRecvBytes = 0;
while (iRecvBytes < iBufLen)
{
if ((iRet = select(m_iSocket + 1, &stFdSet, NULL, NULL, &stTimeout)) == -1)
{
//select socket for read failed
return -31;
}
else if (iRet == 0)
{
//select socket for read timeout
errno = ETIMEDOUT;
return -32;
}
if ((iRet = recv(m_iSocket, szBuf+iRecvBytes, iBufLen-iRecvBytes, 0)) == -1)
{
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EINTR)
{
continue;
}
else
{
//recv data failed,link break
Close();
return -33;
}
}
else if (iRet == 0)
{
if(errno == EINTR)
{
continue;
}
//recv data none,link break
Close(); //读取长度为0,关闭连接
return -34;
}
iRecvBytes += iRet;
}
return iRecvBytes;
}
return 0;
}
当recv的消息长度为0时,关闭当前socket
void GSKSocket::Close()
{
if( m_iSocket != (int)INVALID_SOCKET )
{
LOGI_GSKCNET_INFO("close socket= %d", m_iSocket);
glodon::closeSocket(m_iSocket);
glodon::unInitlizeSocket();
m_iSocket = INVALID_SOCKET;
}
}
收到请求回应的消息后需要遍历已发送的消息的map(m_hasSendedMap),然后调用相应的回调,发送成功或失败,代码逻辑如下:
void GSKCNet::updateSendedMap(GSKResponse *stGSKResponse)
{
m_sendedmapmutex.lock();
std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
if(iter != m_hasSendedMap.end())
{
m_sendedmapmutex.unlock();
GSKRequest* pSendRequest = iter->second;
m_sendedmapmutex.lock();
m_hasSendedMap.erase(iter);
m_sendedmapmutex.unlock();
stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
stGSKResponse->cb = pSendRequest->GetCallback();
stGSKResponse->m_reqBody = pSendRequest->GetBody();
PushToCallbackQueue(stGSKResponse);
delete pSendRequest;
}
else
{
m_sendedmapmutex.unlock();
//处理先收到回应,还没入已发送map的情况,不能用递归,万一有个不认识的,用递归这里就死循环了
int num = 0;
while (num < 50)
{
glodon::sleep(10);
m_sendedmapmutex.lock();
std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
if (iter != m_hasSendedMap.end())
{
m_sendedmapmutex.unlock();
GSKRequest* pSendRequest = iter->second;
m_sendedmapmutex.lock();
m_hasSendedMap.erase(iter);
m_sendedmapmutex.unlock();
stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
stGSKResponse->cb = pSendRequest->GetCallback();
stGSKResponse->m_reqBody = pSendRequest->GetBody();
PushToCallbackQueue(stGSKResponse);
delete pSendRequest;
break;
}
else
{
m_sendedmapmutex.unlock();
}
++num;
}
if (num == 50)
{
delete stGSKResponse;
}
}
}
接收线程需要不断的遍历已发送消息的map,从而判断消息发送是否超时
void GSKCNet::handleTimeoutMsg()
{
for(std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
iter != m_hasSendedMap.end();)
{
GSKRequest* pRequest = iter->second;
//超时
if(pRequest && (pRequest->GetTimeout() + pRequest->GetSendTime() < GSKRequest::GetCurrentTimeMsec()))
{
//回调函数
handleMsgCallBack(GSKNET_CLIENT_REQUEST_TIMEOUT_CODE, pRequest->GetSequence(),
pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
pRequest->GetSubCmd(),pRequest->GetBody(),pRequest->GetCallback());
m_sendedmapmutex.lock();
m_hasSendedMap.erase(iter++);
m_sendedmapmutex.unlock();
delete pRequest;
pRequest = NULL;
}
else
{
iter++;
}
}
}
接收线程需要检测连接是否还活着,一旦发现连接断开,就尝试重连
void GSKCNet::checkNetActive()
{
if (GetLastActiveTime() != 0 && (time(NULL) - GetLastActiveTime() > (m_iHeartInterval * 2 + 5)))
{
if (time(NULL) - GetLastConnectTime() > 20)
{
if (TryConnectServer(5, CHECK_NET_ACTIVE_CNETWORK))
{
LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is succeed");
}
else
{
LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is failed");
SetIsLogged(false);
}
}
}
}
当接收线程退出时,它需要先等待发送线程退出,因为它需要处理已发送消息的map,针对这些已发送的消息,调用发送消息失败的回调
void GSKCNet::ClearConext()
{
m_recvBuffer.retrieveAll();
if (m_pGSKSocket)
{
m_pGSKSocket->Close();
}
if (g_netConnectCallback)
{
g_netConnectCallback(false);
}
for (std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
iter != m_hasSendedMap.end();)
{
GSKRequest* pRequest = iter->second;
if (pRequest)
{
//回调函数
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(),
pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
m_sendedmapmutex.lock();
m_hasSendedMap.erase(iter++);
m_sendedmapmutex.unlock();
delete pRequest;
pRequest = NULL;
}
}
}