网络编程魔法音视频开发经验之路Live555

Live555源码解析(4) - 鱼儿上钩来

2017-05-27  本文已影响238人  SniperPan

上一篇Live555源码解析(3) - 服务开启,愿者上钩中我们讲到RTSPServer创建后,带来了两项重要支持(旁注类比):

整理一下,其实服务器已经摆好姿势,严阵以待第一个吃鱼饵的鱼上钩了,那么接下来,我们就看鱼儿是怎么一步一步上钩来。

1. 鱼

钓鱼要有鱼,服务要有被服务者,Live555媒体服务器的服务对象就是支持RTSP/RTP协议的客户端。从官网对客户端介绍中我们可以看到,目前支持如下主流客户端:

要钓就钓大鱼,本篇就采用VLC播放器作为客户端,来探测一下咬钩引起的连锁反应。

2. 钩

先看一下服务器准备好后的命令行提示界面,如下图所示:


关键几个信息,说明如下:

因此,本篇中我们以ts文件类型为例,将bipbop-gear1-all.ts文件置于live555MediaServer可执行文件同一路径下。对于VLC而言,想要播放(点播)该文件,则其入口为:

rtsp://192.168.56.1/bipbop-gear1-all.ts

这也就是鱼所看到的钩,而同时,服务器正处于doEventLoop()的循环等待中,正如河边静气凝神握着钓竿的手。

3. 来

如图所示,VLC客户端打开网络串流rtsp://192.168.56.1/bipbop-gear1-all.ts,开始咬钩。

果不其然,这触发了doEventLoop()所调用的BasicTaskScheduler::SingleStep()中的如下代码。

int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
if(selectResult <0)
{
    if( GetLastError() != EINTR )
    {
        // 异常错误,视为严重故障;打印错误信息后退出
        print_Set_info();
        abort();                
    }
}
else //if(selectResult <0)
{
    HandlerIterator iter(*fHandlers);
    HandlerDescriptor* handler;
    if(fLastHandledSocketNum >= 0)
    {   
        // 如已处理过socket读写,则找到前次socket读写的下一个链表节点
        while((handler = iter.next()) != NULL)
        {
            if(handler->socketNum == fLastHandledSocketNum) break;
        }
        if(handler == NULL)
        {
            // 未找到,重置相关值
            fLastHandlerSocketNum = -1;
            iter.reset();
        }
    }
    while((handler = iter.next()) != NULL)
    {
        // 找到链表中合法节点,开始处理
        int sock = handler->socketNum;
        int resultConditionSet = 0;
        if(FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)) resultConditionSet |= SOCKET_READABLE;
        if(FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet) resultConditionSet |= SOCKET_WRITEABLE;
        if(FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet) resultConditionSet |= SOCKET_EXCEPTION;
        if((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL)
        {
            // 保存当前处理节点socketNum
            fLastHandledSocketNum = sock;
            (*handler->handlerProc)(handler->clientData, resultConditionSet);
            break;
        }
    } // while((handler = iter.next()) != NULL)
    ...
}

代码已于 Live555源码解析(1) - Main 寻根问祖,留其筋骨中Section 3进行了详细说明,这里不再赘述。总之要注意的是,incomingConnectionHandler服务已经注册好,存放位置就是HandlerSet中(详见Live555源码解析(3) - 服务开启,愿者上钩
Section 2.1.1.1.4)。

3.1 incomingConnectionHandler

有朋自远方来,GenericMediaServer::incomingConnectionHandler终于粉墨登场。

void GenericMediaServer::incomingConnectionHandler(void* instance, int /*mask*/)
{
    GenericMediaServer* server = (GenericMediaServer*)instance;
    server->incomingConnectionHandler();
}

这里的instance归根结底注册时是在GenericMediaServer构造函数中用的this指针,因此调用的也就依然是自身无参的incomingConnectionHandler()。

void GenericMediaServer::incomingConnectionHandler()
{
    incomingConnectionHandlerOnSocket(fServerSocket);
}

还是一层封装,为了类接口的隐藏。

void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket)
{
    //@3.1.1 socket accept 
    struct sockaddr_in clientAddr;
    SOCKLEN_T clientAddrLen = sizeof clientAddr;
    int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
    if(clientSocket < 0)
    {
        int err = envir().getErrno();
        if(err != EWOULDBLOCK)
            envir().setResultErrMsg("accept() failed: ");
        return;
    }
    
    //@3.1.2 socket revise
    ignoreSigPipeOnSocket(clientSocket);
    makeSocketBlocking(clientSocket);
    increaseSendBufferTo(envir(), clientSocket, 50*1024);

    //@3.1.3 createNewClientConnection
    (void)createNewClientConnection(clientSocket, clientAddr);
}

@3.1.1 socket accept

这段代码其实并无多少好说的,如果你看过关于socket编程的书,那么这些就只是基础的socket accept套路。甚至,如果有需要的,比如显示客户端地址、端口信息,你也可以在套路上加上一些输出操作。

@3.1.2 socket revise

Live555源码解析(3) - 服务开启,愿者上钩 中介绍过的一样,忽略SIGPIPE是为了防止退出,非阻塞模式是为了支持同时多Socket,调整Buffer是为了配合重传。有兴趣的话可以详细阅读Live555源码解析(3) - 服务开启,愿者上钩
Section @1.1部分。

@3.1.3 createNewClientConnection

到了这里,才是真正的重头戏。这里实际调用的是RTSPServerSupportHTTPStreaming中的createNewClientConnection(),其代码如下:

GenericMediaServer::ClientConnection*
RTSPServerSupportingHTTPStreaming::createNewClientConnection(int clientSocket, struct sockaddr_in clientAddr)
{
        return new RTSPClientConnectionSupportingHTTPStreaming(*this, clientSocket, clientAddr);
}

其调用了RTSPClientConnectionSupportingHTTPStreaming构造函数。

RTSPServerSupportingHTTPStreaming::RTSPClientConnectionSupportingHTTPStreaming
::RTSPClientConnectionSupportingHTTPStreaming(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
    : RTSPClientConnection(ourServer, clientSocket, clientAddr)
    , fClientSessionId(0), fStreamSource(NULL), fPlaylistSource(NULL), fTCPSink(NULL) 
{}

进一步调用了RTSPClientConnection()构造函数。

RTSPServer::RTSPClientConnection
::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
    : GenericMediaServer::ClientConnection(ourServer, clientSocket, clientAddr)
    , fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket)
    , fClientOutputSocket(fOurSocket), fIsActive(True)
    , fRecursionCount(0), fOurSessionCookie(NULL) 
{
        resetRequestBuffer();
}

resetRequestBuffer()真的就只是重设了请求Buffer(),没有其他操作。我们需要关注的是这里调用了GenericMediaServer::ClientConnection()函数。

GenericMediaServer::ClientConnection
::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
    : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) 
{
    //@3.1.3.1 哈希表
    fOurServer.fClientConnections->Add((char const*)this, this);
    resetRequestBuffer();
    
    //@3.1.3.2 新服务incomingRequestHandler
    envir().taskScheduler().setBackgroundHandling(fOurSocket,   
                                    SOCKET_READABLE|SOCKET_EXCEPTION, 
                                    incomingRequestHandler, this);
}

@3.1.3.1 哈希表

如果你还记得Live555源码解析(3) - 服务开启,愿者上钩 中有提到,可修改ClientConnections哈希表的API之一就是ClientConnection()构造函数,那么这里就可以推出,咬钩的动作引起了数据的变化。该变化将永存于服务器生命周期内,直到有人将其从表中抹去。而抹去也只能由哈希表的另一API,~ClientConnection()析构函数完成。

也就是说,该连接,将从连接创建开始存在,将于连接销毁而逝去

@3.1.3.2 新服务incomingRequestHandler

程序是指令加上数据,数据固然重要,但必须指令将其盘活。代码到这里,开启了新的服务incomingRequestHandler,从名称上来看,应该是服务于客户端发出的RTSP请求,那么究竟是不是呢?就在下一小节继续跟踪进去。

3.2 新服务incomingRequestHandler

还是SingleStep()中那段调度代码,换了个主角,戏照样唱。这次轮到incomingRequestHandler。

void GenericMediaServer::ClientConnection::incomingRequestHandler(void* instance, int /*mask*/) 
{
    ClientConnection* connection = (ClientConnection*)instance;
    connection->incomingRequestHandler();
}

一层封装。

void GenericMediaServer::ClientConnection::incomingRequestHandler() 
{
    struct sockaddr_in dummy; 
    //@3.2.1 readSocket
    int bytesRead = readSocket(envir(), fOurSocket,                 &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
    //@3.2.2 handleRequestBytes
    handleRequestBytes(bytesRead);
}

@3.2.1 readSocket

依然调用的是GroupsockHelper提供的帮助函数,其内部代码如下。

int readSocket(UsageEnvironment& env, int socket, 
        unsigned char* buffer, unsigned bufferSize, 
        struct sockaddr_in& fromAddress) 
{
    SOCKLEN_T addressSize = sizeof fromAddress;
    int bytesRead = recvfrom(socket, (char*)buffer, bufferSize, 0,
                            (struct sockaddr*)&fromAddress, &addressSize);
    if(bytesRead < 0)
    {
        int err = env.getErrno();
        if( err == 0 || err == EWOULDBLOCK // Windows
         || err == EAGAIN || err == 111 || err == 113 )// ECONNREFUSED(linux)
        {
            fromAddress.sin_addr.s_addr = 0;
            return 0;
        }
        socketErr(env, "recvfrom() error: ");
    }
    else if(bytesRead == 0)
        return -1;
    
    return bytesRead;
}

标准read套路,调用了winsock的recvfrom函数,对读取字节数进行校验。要么错误了清场报错,要么正确了返回。
稍加注意的是最后一个参数,也就是incomingRequestHandler调用中的dummy结构体,其用于存放请求发出者,也就是说客户端的地址,这里并没有实际用处。

@3.2.2 handleRequestBytes

void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)
{
    int numBytesRemaining = 0;
    ++fRecursionCount;
    
    do{
        RTSPServer::RTSPClientSession* clientSession = NULL;
        if(newBytesRead < 0 || (unsigned)newBytesRead>= RequestBufferBytesLeft) {
            //读取失败,或读取到错误信息,关闭连接
            fIsActive = False;
            break;
        }
        
        Boolean endOfMsg = False;
        unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];
        if(fClientOutputSocket != fClientInputSocket && numBytesRemaining == 0) {
            //去除空白字符
            unsigned toIndex = 0;
            for(int fromIndex = 0; fromIndex < newBytesRead; ++fromIndex) {
                char c = ptr[fromIndex];
                if(!(c == '' || c == '\t' || c == '\r' || c == '\n'))
                    ptr[toIndex++] = c;
            }
            newBytesRead = toIndex;
            
            //判定为RTSP-over-HTTP tunneling,其中内容可能使用Base64编码,
            //所以此处尽可能使用Base64解码
            unsigned numBytesToDecode = fBase64RemainderCount + newBytesRead;
            unsigned numBase64RemainderCount = numBytesToDecode % 4;
            numBytesToDecode -= newBase64RemainderCount;
            if(numBytesToDecode > 0) {
                ptr[newBytesRead] = '\0';
                unsigned decodedSize;
                unsigned char* decodedBytes = base64Decode((char const*)(ptr-fBase64RemainderCount), numBytesToDecode, decodedSize);
                
                unsigned char* to = ptr - fBase64RemainderCount;\
                for(unsigned i = 0; i < decodedSize; ++i)
                    *to++ = decodedBytes[i];
                    
                for(unsigned j=0; j < newBase64RemainderCount; ++j)
                    *to++ = (ptr-fBase64RemainderCount + numBytesToDecode)[j];
                
                newBytesRead = decodedSize - fBase64RemainderCount + newBase64RemainderCount;
                delete[] decodedBytes;
            }
            fBase64RemainderCount = newBase64RemainderCount;
        }       
        
        //@3.2.2.1 确保Request消息完整性
        unsigned char* tmpPtr = fLastCRLF + 2;
        if(fBase64RemainderCount == 0)
        {
            if(tmpPtr < fRequestBuffer)
                tmpPtr = fRequestBuffer;
            while(tmpPtr < &ptr[newBytesRead - 1])
            {
                //查找消息结尾标识符 <CR><LF><CR><LF>
                if(*tmpPtr == '\r' && *(tmpPtr + 1) == '\n')
                {
                    if(tmpPtr - fLastCRLF == 2)
                    {
                        endOfMsg = True;
                        break;
                    }
                    fLastCRLF = tmpPtr;
                }
                ++tmpPtr;
            }
        }
        
        fRequestBufferBytesLeft -= newBytesRead;
        fRequestBufferAlreadySeen += newBytesRead;
        // 确保Request完整性
        if(!endOfMsg) break;
        
        fRequestBuffer[fRequstBytesAlreadySeen] = '\0';
        char cmdName[RTSP_PARAM_STRING_MAX];
        char urlPreSuffix[RTSP_PARAM_STRING_MAX];
        char urlSuffix[RTSP_PARAM_STRING_MAX];
        char cseq[RTSP_PARAM_STRING_MAX];
        char sessionIdStr[RTSP_PARAM_STRING_MAX];
        unsigned contentLength = 0;
        
        fLastCRLF[2] = '\0';
        
        //@3.2.2.2 解析RTSP请求
        Boolean parseSucceeded = parseRTSPRequstString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer, cmdName, sizeof cmdName, urlPreSuffix, sizeof urlPreSuffix, urlSuffix, sizeof urlSuffix, cseq, sizeof cseq, sessionIdStr, sizeof sessionIdStr, contentLength);
        fLastCRLF[2] = '\r';
        
        Boolean playAfterSetup = False;
        if(parseSucceeded){
            //如头中存在Content-Length,则再次校验消息完整性
            if(ptr + newBytesRead < tmpPtr + 2 + contentLength) break;
            
            Boolean const requestIncludedSessionId = sessionIdStr[0] != '\0'; 
            if(requestIncludedSessionId){
                //如头中存在SessionID,则验证该会话是否存在,并确认其状态
                clientSession = (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr));
                if(clientSession != NULL) clientSession->noteLiveness();
            }
            
            //@3.2.2.3 处理RTSP请求中方法
            fCurrentCSeq = cseq;
            if(strcmp(cmdName, "OPTIONS") == 0){
                if(requestIncludedSessionId && clientSession == NULL)
                    handleCmd_sessionNotFound();
                else
                    handleCmd_OPTIONS();
            }
            else if(urlPreSuffix[0] == '\0' && rlSuffix[0] == '*' && urlSuffix[1] == '\0'){
                if(strcmp(cmdName, "GET_PARAMETER") == 0)
                    handleCmd_GET_PARAMETER((char const*)fRequestBuffer);
                else  if(strcmp(cmdName, "SET_PARAMETER") == 0)
                    handleCmd_SET_PARAMETER((char const*)fRequestBuffer);
                else
                    handleCmd_notSupported();
            }
            else if(strcmp(cmdName, "DESCRIBE") == 0){
                handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
            }
            else if(strcmp(cmdName, "SETUP") == 0){
                Boolean areAuthenticated = True;
                if(!requestIncludedSessionId){
                    // 创建会话
                    char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];
                    urlTotalSuffix[0] = '\0';
                    if(urlPreSuffix[0] != '\0'){
                        strcat(urlTotalSuffix, urlPreSuffix);
                        strcat(urlTotalSuffix, "/");
                    }
                    strcat(urlTotalSuffix, urlSuffix);
                    if(authenticationOK("SETUP", urlTotalSuffix, (char const*)fRequestBuffer))
                        clientSession = (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
                    else 
                        areAuthenticated = False;
                }
                
                if (clientSession != NULL) 
                    clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix,urlSuffix, (char const*)fRequestBuffer);
                else
                    handleCmd_sessionNotFound();
            }
            else if (strcmp(cmdName, "TEARDOWN") == 0
                    || strcmp(cmdName, "PLAY") == 0
                    || strcmp(cmdName, "PAUSE") == 0
                    || strcmp(cmdName, "GET_PARAMETER") == 0
                    || strcmp(cmdName, "SET_PARAMETER") == 0) {
                if (clientSession != NULL)
                    clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
                else
                    handleCmd_sessionNotFound();
            }
            else if(strcmp(cmdName, "REGISTER") == 0 || strcmp(cmdName, "DEREGISTER") == 0) {
                char* url = strDupSize((char*)fRequestBuffer);
                if (sscanf((char*)fRequestBuffer, "%*s %s", url) == 1) {
                    Boolean reuseConnection, deliverViaTCP;
                    char* proxyURLSuffix;
                    parseTransportHeaderForREGISTER((const char*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
                    handleCmd_REGISTER(cmdName, url, urlSuffix, (char const*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
                    delete[] proxyURLSuffix;
                } else {
                    handleCmd_bad();
                }
                delete[] url;
            } else {
                handleCmd_notSupported();
            }
        } else {
            // RTSP-over-HTTP tunnel
            char sessionCookie[RTSP_PARAM_STRING_MAX];
            char acceptStr[RTSP_PARAM_STRING_MAX];
            *fLastCRLF = '\0';
            parseSucceeded = parseHTTPRequestString(cmdName, sizeof cmdName,urlSuffix, sizeof urlPreSuffix,  sessionCookie, sizeof sessionCookie, acceptStr, sizeof acceptStr);
            *fLastCRLF = '\r';
            if (parseSucceeded) {
                // Check that the HTTP command is valid for RTSP-over-HTTP tunneling: There must be a 'session cookie'.
                Boolean isValidHTTPCmd = True;
                if (strcmp(cmdName, "OPTIONS") == 0) {
                    handleHTTPCmd_OPTIONS();
                } else if (sessionCookie[0] == '\0') {
                    if (strcmp(acceptStr, "application/x-rtsp-tunnelled") == 0) 
                        isValidHTTPCmd = False;
                    else
                        handleHTTPCmd_StreamingGET(urlSuffix, (char const*)fRequestBuffer);
                } else if (strcmp(cmdName, "GET") == 0){
                    handleHTTPCmd_TunnelingGET(sessionCookie);
                } else if (strcmp(cmdName, "POST") == 0) {
                    unsigned char const* extraData = fLastCRLF+4;
                    unsigned extraDataSize = &fRequestBuffer[fRequestBytesAlreadySeen] - extraData;
                    if (handleHTTPCmd_TunnelingPOST(sessionCookie, extraData, extraDataSize)) {
                        fIsActive = False;
                        break;
                    }
                }
                else 
                    isValidHTTPCmd = False;
                
                if (!isValidHTTPCmd)
                    handleHTTPCmd_notSupported();
                else
                    handleCmd_bad();
                
                send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0);
                
                if (playAfterSetup) 
                    clientSession->handleCmd_withinSession(this, "PLAY", urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
                                
                unsigned requestSize = (fLastCRLF+4-fRequestBuffer) + contentLength;
                numBytesRemaining = fRequestBytesAlreadySeen - requestSize;
                resetRequestBuffer(); 
                
                if (numBytesRemaining > 0) {
                    memmove(fRequestBuffer, &fRequestBuffer[requestSize], numBytesRemaining);
                    newBytesRead = numBytesRemaining;
                }
            } while (numBytesRemaining > 0);

            --fRecursionCount;
            if(!fIsActive) {
                if(fRecursionCount > 0)
                    closeSockets();
                else
                    delete this;
            }
        }   
    }
}

@3.2.2.1 确保Request消息完整性

代码用于确保已完整接收Request消息,判断标注为是否能检测到消息结尾标志CRLF CRLF\r\n\r\n。如未检测到,退出循环,继续接收,直到完整为止。

@3.2.2.2 解析RTSP请求

函数parseRTSPRequestString()实现位置在RTSPCommon中,同样以全局函数形式存在。由于3.2.2中处理函数众多,如均一一展开,篇幅将过长过臭。因此这里仅列出其步骤及示例Request,如有兴趣,可自行阅读相关代码。

OPTIONS rtsp://192.168.56.1/bipbop-gear1-all.ts RTSP/1.0
CSeq : 2
User-Agent : LibVLC/2.2.6 (LIVE555 Streaming Media v2016.02.22)

补充说明 User-Agent
User-Agent用于标识应用类型、操作系统、软件版本、开发商等信息。例如

Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1
Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0

此时如服务器端对不同类型客户端有做更优适配,如针对手机、电脑制作不同的网页布局,就可以更好地提升用户体验。

成功解析RTSP请求后,如其中存在SessionID,则需在哈希表中查找该ID值。如查找成功,进一步确认其状态。

if(requestIncludedSessionId)
{
    clientSession = (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr));
    if(clientSession != NULL) clientSession->noteLiveness();                
}

lookupClientSession的源码就不放了,纯粹的查找HashTable而已,有兴趣的话可以阅读GenericMediaServer::lookupClientSession()并进一步跟踪。

关于noteLiveness()要稍微说明下,因为其可能引申出一个新的延时任务。

void GenericMediaServer::ClientSession::noteLiveness()
{
    // 使用默认实现,无其他操作,属虚张声势
    if(fOurServerMediaSession != NULL)
        fOurServerMediaSession->noteLiveness();
        
    // fReclamationSeconds>0时开启延时任务livenessTimeoutTask,延时时长为fReclamationSeconds
    if(fOurServer.fReclamationSeconds > 0)
        envir().taskScheduler().rescheduleDelayedTask(fLivenessCheckTask, 
                                fOurServer.fReclamationSeconds*1000000,
                                (TaskFunc*)livenessTimeoutTask, this));
}

fReclamationSeconds是由main函数中DynamicRTSPServer创建时传递参数而来,其值为0。因此此处并不会开启,至于什么时候会开启,只能说,本程序中不会开启。如果开启,且到达指定时长,则会删除clientSession。

@3.2.2.3 处理RTSP请求中方法

如对RTSP请求、回复不太熟悉,可先阅读Live555源码解析(2) - RTSP协议概述
RTSP请求中会存在几种方法,这里列出了所有支持的方法,各方法及相应处理如下:

4. 总结

综上所述,客户端与服务器进行连接过程实际上就是为RTSP会话交互过程,而其中会进一步产生连锁反应的步骤主要有:

篇幅所限,将根据这些线索展开下一篇。

上一篇 下一篇

猜你喜欢

热点阅读