IM客户端开发(3)——接收线程

2019-12-06  本文已影响0人  Magic11

1、 接收线程负责发送心跳、读取消息、检查已发送的消息是否超时、检查网络是否还活着等

void GSKCNet::recvLoop()
{
    SetRecvLoop(true);
    SetExitRecvLoop(false);
    while (GetRecvLoop())
    {
        //满足条件就发送心跳
        sendHeartMsg();
        //检查网络上是否有消息可读
        readMsg();
        //处理超时消息
        handleTimeoutMsg();
        //检查网络是否活着
        checkNetActive();
    }

    //等待发送线程退出(这样才能正确处理所有请求回调)最多200ms
    int num = 0;
    while (!GetExitSendLoop())
    {
        glodon::sleep(10);
        ++num;
        if (num > 20)
        {
            break;
        }
    }

    ClearConext();
    SetLastActiveTime(0);
    m_iLastSendHeartTime = 0;
    SetExitRecvLoop(true);
}

心跳发送的逻辑如下:
当发送队列为空且100秒内没有任何消息收发成功时,发送心跳

void GSKCNet::sendHeartMsg()
{
    //当前时间秒数
    time_t now = time(NULL);
    //心跳开始、发送对列为空、100s没有任何消息收发(成功)
    if (SendQueueIsEmpty() && m_bHeartSendStart 
        && (now - GetLastActiveTime() >= m_iHeartInterval) 
        && (now - m_iLastSendHeartTime >= m_iHeartInterval))
    {
        m_iLastSendHeartTime = now;

        GSKPacket *pPacket = createPacket(CMD_HEART,0,"");
        pPacket->setBody("");

        GSKRequest* pRequest = new GSKRequest(pPacket, 1000,g_heartCallback);

        PushToSendQueue(pRequest);   //将心跳请求放到发送队列
    }
}

接收消息的逻辑如下:
1) 如果是服务器发来的消息(聊天消息、透传协议),直接放到回调队列queue<GSKResponse*> m_hasCallbackMap
2) 如果是登录应答,启动心跳
3) 如果是登出应答,停止心跳
4) 如果是心跳应答,不做任何处理
5) 如果是发送请求应答,检查已发送消息的map m_hasSendedMap
代码逻辑如下:

void GSKCNet::readMsg()
{
    //检查网络上是否有数据可以读取
    int iSocketStatus = m_pGSKSocket->Select();
    if (iSocketStatus == GSK_SELECT_ERROR)
    {
        m_iNetStatus = GSKNET_STATUS_DISCONNECT;
    }
    else if (iSocketStatus == GSK_SELECT_READ)
    {
        m_iNetStatus = GSKNET_STATUS_OK;
        GSKResponse *stGSKResponse = new GSKResponse();
        int iRet = HandleResponse(stGSKResponse);
        if(iRet == 0)
        {
            //处理服务器发来的消息
            SetLastActiveTime(time(NULL));
            if(stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CHATMSG ||
                stGSKResponse->stHead.wCmd==SUBCMD_PUSH_SYSTEM_PROTOCOL ||
                stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CUSTOM_PROYOCOL)
            {
                stGSKResponse->code = iRet;
                stGSKResponse->cb = g_pushCallback;

                PushToCallbackQueue(stGSKResponse);
                return;
            }
            else if(stGSKResponse->stHead.wCmd == CMD_HEART_ANSWER)
            {
                //do nothing
            }
            else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_SUCCEED)
            {
                StartHeartBeat();
                SetIsLogged(true);
            }
            else if (stGSKResponse->stHead.wCmd == SUBCMD_RCODE_LOGIN_ANSWER_SUCCEED)
            {
                SetIsLogged(true);
            }
            else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_FAILED)
            {
                StopHeartBeat();
            }
            else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGOUT_ANSWER)
            {
                StopHeartBeat();
            }

            updateSendedMap(stGSKResponse);
        }
    }
}

读取并解析一条消息的逻辑如下:

int GSKCNet::HandleResponse(GSKResponse* pGSKResponse)
{
    //获取可读字节数
    int readedBytes = m_recvBuffer.readableBytes();
    //获取开始标志和命令字
    if(readedBytes <= 3)
    {
        char *pBuff = nullptr;
        if (readedBytes == 3)
        {
            //开始位校验
            if (m_recvBuffer.peekInt8() != STX)
            {
                m_recvBuffer.retrieveAll();
                return -2;
            }
            //心跳应答校验
            if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
            {
                //校验结束标志
                pBuff = (char*)malloc(1);
                int iRet = m_pGSKSocket->Recv(pBuff, 1);

                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0)
                {
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);

                if (m_recvBuffer.peekHeartBeatEnd() != ETX)
                {
                    m_recvBuffer.retrieveAll();
                    free(pBuff);
                    return -3;
                }
                free(pBuff);
                m_recvBuffer.retrieveAll();

                pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
                pGSKResponse->stHead.subCmd = 0;
                pGSKResponse->stHead.wClt = 3;
                pGSKResponse->stHead.seqid = 0;
                pGSKResponse->SetSequence(0);
                pGSKResponse->SetMsg("");
                return 0;
            } else {
                //获取总长度
                pBuff = (char*)malloc(4);
                int iRet = m_pGSKSocket->Recv(pBuff, 4);
                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0)
                {
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);
            }
        } else {
            //获取完整的前三个字节
            pBuff = (char*)malloc(3 - readedBytes);
            int iRet = m_pGSKSocket->Recv(pBuff, 3 - readedBytes);

            if (iRet < 0)
            {
                handleRelogin(iRet);
                free(pBuff);
                return -1;
            }
            else if (iRet == 0){
                free(pBuff);
                return -1;
            }
            m_recvBuffer.append(pBuff, iRet);


            if (m_recvBuffer.peekInt8() != STX)
            {
                m_recvBuffer.retrieveAll();
                free(pBuff);
                return -2;
            }

            free(pBuff);
            pBuff = nullptr;

            if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
            {
                pBuff = (char*)malloc(1);
                int iRet = m_pGSKSocket->Recv(pBuff, 1);

                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0)
                {
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);

                if (m_recvBuffer.peekHeartBeatEnd() != ETX)
                {
                    m_recvBuffer.retrieveAll();
                    free(pBuff);
                    return -3;
                }
                free(pBuff);
                m_recvBuffer.retrieveAll();

                pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
                pGSKResponse->stHead.subCmd = 0;
                pGSKResponse->stHead.wClt = 3;
                pGSKResponse->stHead.seqid = 0;
                pGSKResponse->SetSequence(0);
                pGSKResponse->SetMsg("");
                return 0;
            } else {
                pBuff = (char*)malloc(4);
                int iRet = m_pGSKSocket->Recv(pBuff, 4);
                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0)
                {
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);
            }
        }
        
        if(m_recvBuffer.readableBytes() < 7)
        {
            free(pBuff);
            return -3;
        }
        free(pBuff);
        //消息长度
        int msgLen = m_recvBuffer.peekMsgLength();
        if( msgLen <= 0 /*|| msgLen > 2048000 */)
        {
            m_recvBuffer.retrieveAll();
            return -4;
        }
        else if(msgLen > (int)m_recvBuffer.readableBytes())
        {
            if(HandleResp(msgLen,pGSKResponse) < 0)
            {
                return -5;
            }
        }
    }else {
        if (readedBytes < 7)
        {
            char *pBuff = (char*)malloc(7 - readedBytes);
            int iRet = m_pGSKSocket->Recv(pBuff, 7 - readedBytes);

            if (iRet < 0)
            {
                handleRelogin(iRet);
                free(pBuff);
                return -1;
            }
            else if (iRet == 0)
            {
                free(pBuff);
                return -1;
            }
            m_recvBuffer.append(pBuff, iRet);
            free(pBuff);
        }

        //消息长度
        int msgLen = m_recvBuffer.peekMsgLength();
        if (msgLen - (int)m_recvBuffer.readableBytes() < 0)
        {
            m_recvBuffer.retrieveAll();
            return -6;
        }

        if (HandleResp(msgLen, pGSKResponse) < 0)
        {
            return -7;
        }
    }

    m_recvBuffer.retrieveAll();
    
    return 0;
}

读取消息到缓冲区的过程如下:

int GSKSocket::Recv( char* szBuf, int iBufLen , int iTimeout ) 
{
    if( IsConnected() )
    {
        fd_set stFdSet;
        FD_ZERO(&stFdSet);
        FD_SET(m_iSocket, &stFdSet);

        struct timeval stTimeout;
        stTimeout.tv_sec = iTimeout;
        stTimeout.tv_usec = 0;

        int iRet = 0;
        int iRecvBytes = 0;
        while (iRecvBytes < iBufLen)
        {
            if ((iRet = select(m_iSocket + 1, &stFdSet, NULL, NULL, &stTimeout)) == -1)
            {
                //select socket for read failed 
                return -31;
            }
            else if (iRet == 0)
            {
                //select socket for read timeout
                errno = ETIMEDOUT;
                return -32;
            }

            if ((iRet = recv(m_iSocket, szBuf+iRecvBytes, iBufLen-iRecvBytes, 0)) == -1)
            {
                if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EINTR)
                {
                    continue;
                }
                else
                {
                    //recv data failed,link break
                    Close();
                    return -33;
                }
            }
            else if (iRet == 0)
            {
                if(errno == EINTR)
                {
                    continue;
                }
                //recv data none,link break
                Close();   //读取长度为0,关闭连接
                return -34;
            }

            iRecvBytes += iRet;
        }

        return iRecvBytes;
    }

    return 0;
}

当recv的消息长度为0时,关闭当前socket

void GSKSocket::Close() 
{
    if( m_iSocket != (int)INVALID_SOCKET )
    {
        LOGI_GSKCNET_INFO("close socket= %d", m_iSocket);
        glodon::closeSocket(m_iSocket);
        glodon::unInitlizeSocket();
        m_iSocket = INVALID_SOCKET; 
    }
}

收到请求回应的消息后需要遍历已发送的消息的map(m_hasSendedMap),然后调用相应的回调,发送成功或失败,代码逻辑如下:

void GSKCNet::updateSendedMap(GSKResponse *stGSKResponse)
{
    m_sendedmapmutex.lock();
    std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
    if(iter != m_hasSendedMap.end())
    {
        m_sendedmapmutex.unlock();
        GSKRequest* pSendRequest = iter->second;

        m_sendedmapmutex.lock();
        m_hasSendedMap.erase(iter);
        m_sendedmapmutex.unlock();

        stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
        stGSKResponse->cb = pSendRequest->GetCallback();
        stGSKResponse->m_reqBody = pSendRequest->GetBody();

        PushToCallbackQueue(stGSKResponse);
        delete pSendRequest;
    }
    else
    {
        m_sendedmapmutex.unlock();
        //处理先收到回应,还没入已发送map的情况,不能用递归,万一有个不认识的,用递归这里就死循环了
        int num = 0;
        while (num < 50)
        {
            glodon::sleep(10);

            m_sendedmapmutex.lock();
            std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
            if (iter != m_hasSendedMap.end())
            {
                m_sendedmapmutex.unlock();
                GSKRequest* pSendRequest = iter->second;

                m_sendedmapmutex.lock();
                m_hasSendedMap.erase(iter);
                m_sendedmapmutex.unlock();

                stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
                stGSKResponse->cb = pSendRequest->GetCallback();
                stGSKResponse->m_reqBody = pSendRequest->GetBody();

                PushToCallbackQueue(stGSKResponse);
                delete pSendRequest;
                break;
            }
            else
            {
                m_sendedmapmutex.unlock();
            }
            ++num;
        }
        
        if (num == 50)
        {
            delete stGSKResponse;
        }
    }
}

接收线程需要不断的遍历已发送消息的map,从而判断消息发送是否超时

void GSKCNet::handleTimeoutMsg()
{
    for(std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
        iter != m_hasSendedMap.end();)
    {
        GSKRequest* pRequest = iter->second;
        //超时
        if(pRequest && (pRequest->GetTimeout() + pRequest->GetSendTime() < GSKRequest::GetCurrentTimeMsec()))
        {
            //回调函数
            handleMsgCallBack(GSKNET_CLIENT_REQUEST_TIMEOUT_CODE, pRequest->GetSequence(), 
                                pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                              pRequest->GetSubCmd(),pRequest->GetBody(),pRequest->GetCallback());

            m_sendedmapmutex.lock();
            m_hasSendedMap.erase(iter++);
            m_sendedmapmutex.unlock();
            delete pRequest;
            pRequest = NULL;
        }
        else
        {
            iter++;
        }
    }
}

接收线程需要检测连接是否还活着,一旦发现连接断开,就尝试重连

void GSKCNet::checkNetActive()
{
    if (GetLastActiveTime() != 0 && (time(NULL) - GetLastActiveTime() > (m_iHeartInterval * 2 + 5)))
    {
        if (time(NULL) - GetLastConnectTime() > 20)
        {
            if (TryConnectServer(5, CHECK_NET_ACTIVE_CNETWORK))
            {
                LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is succeed");
            }
            else
            {
                LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is failed");
                SetIsLogged(false);
            }
        }
    }
}

当接收线程退出时,它需要先等待发送线程退出,因为它需要处理已发送消息的map,针对这些已发送的消息,调用发送消息失败的回调

void GSKCNet::ClearConext()
{
    m_recvBuffer.retrieveAll();
    if (m_pGSKSocket)
    {
        m_pGSKSocket->Close();
    }
    if (g_netConnectCallback)
    {
        g_netConnectCallback(false);
    }

    for (std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
        iter != m_hasSendedMap.end();)
    {
        GSKRequest* pRequest = iter->second;
        if (pRequest)
        {
            //回调函数
            handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), 
                pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());

            m_sendedmapmutex.lock();
            m_hasSendedMap.erase(iter++);
            m_sendedmapmutex.unlock();

            delete pRequest;
            pRequest = NULL;
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读