多媒体开发我爱编程

boost asio 实现http-flv rtsp hls流媒

2017-03-23  本文已影响2192人  little_wang

gihub:https://github.com/wangdxh/Desert-Eagle/
只实现了视频的处理。rtsp只支持rtp over rtsp

简单说下使用asio的原因,一开始使用go实现了http-flv流媒体服务器的功能,总共大约300行的代码,生产率非常高的,本机测试基本没有问题,但是当局域网内测试的时候,因为buffer的回收机制,导致一对多时chan丢消息,内存使用也是很感人。

也考虑了使用libevent和自己进行内存计数管理,可行是可行的,但是生产率低,维护成本高。后来采用asio的proactor网络模型,加上智能指针对码流内存计数自动释放,以及c11的lambda函数支持,使得最终的流媒体服务器代码有一种同步routine编程的错觉。整个服务器c++的代码大约1650行,生产率上来讲不输go,内存和效率上完胜。

在一对多的实时转发上,go不是那么合适,所以采用asio。但是go有go的长处,流媒体协议的实现终归还是技术层面的东西,没有太多的花头,选择采用的语言实现时,更多的考量的是生产率,elegance。(维护性并不在考量范围内,一般生产率高的语言,代码更少,很多功能是封装在语言内的,语言使用越广泛,功能越稳定,更易维护。)

从一对多的转发来比较go和asio,go的chan相当于一个stl的deque队列,routine调度其实都是通过网络出发的,内存自动回收可以通过智能指针完成,c++里过多的new可以通过jemalloc进行优化,这样来看,能不能设计一种dsl,语法像go一样简洁,但是执行的时候,先将dsl翻译成boost asio的函数调用,然后编译执行?

(感觉有门,dsl的翻译过程可以使用python来实现,请参考pyparsing实现letrec语法函数递归调用

boost asio基础

asio 异步accept

async_accept指定socket,和一个lambda函数,发起一次,成功之后,函数被执行,socket被赋予正确的值,进行处理,然后再发起另一次异步accept。

asio 异步读写

异步读取网络数据使用到了2个读取的函数:

asio写数据async_write也是异步的,有几个注意项:

shared_const_buffer_flv

shared_const_buffer_flv 实现了ConstBufferSequence接口:

// Implement the ConstBufferSequence requirements.
typedef boost::asio::const_buffer value_type;
typedef const boost::asio::const_buffer* const_iterator;
const boost::asio::const_buffer* begin() const { return m_abuffer; }
const boost::asio::const_buffer* end() const { return m_abuffer + FLV_ASIO_BUFFER; }
boost::asio::const_buffer m_abuffer[FLV_ASIO_BUFFER];

FLV_ASIO_BUFFER的值是3,有三个const_buffer:

m_streamdata = std::shared_ptr<uint8_t>(new uint8_t[dwtotallen],
                                  []( uint8_t *p ) { delete[] p; });
m_abuffer[1] = boost::asio::buffer(m_streamdata.get(), dwtotallen);
typedef std::deque<shared_const_buffer_flv> stream_message_queue;

streampushclient 码流推送

代码在streampushclient文件夹内

tcp server

代码在streamserver文件夹内

boost::asio::io_service io_service; 
tcp_server<stream_rtsp_to> server_rtsp_to(io_service, tcp::endpoint(tcp::v4(), 554));
tcp_server<stream_httpflv_to> server_Httpflv_to(io_service, tcp::endpoint(tcp::v4(), 1984));
tcp_server<stream_flv_from> server_flv_from(io_service, tcp::endpoint(tcp::v4(), 1985));        
io_service.run();
void do_accept()
    {
        //a new connection
        acceptor_.async_accept(socket_,
            [this](boost::system::error_code ec)
        {
            if (!ec)
            {               
                std::make_shared<T>(std::move(socket_))->start();//session
            }

            do_accept();
        });
    }

stream_hub

typedef std::shared_ptr<stream_hub> stream_hub_ptr;
std::map< std::string, stream_hub_ptr > g_map_stream_hubs;
std::set<stream_session_ptr> http_flv_sessions_;
std::set<stream_session_ptr> rtsp_sessions_;
copyed_buffer m_buf_header;
std::string m_strname;
void join_http_flv(stream_session_ptr participant)
void leave_http_flv(stream_session_ptr participant)
void join_rtsp(stream_session_ptr participant)
void leave_rtsp(stream_session_ptr participant)
void deliver(const boost::asio::mutable_buffer& msg, bool isheader = false)
class stream_session
{
public:
    virtual ~stream_session() {}
    virtual void deliver(const shared_const_buffer_flv& msg) = 0; 
//participant should deliver message    
};

stream_session 定义了向请求码流的客户端发送码流的统一接口。

stream_flv_from

class stream_flv_from : public std::enable_shared_from_this<stream_flv_from>
{
    void start()
    {   
        do_read_header();
    }
}
boost::asio::mutable_buffer steambuf (m_bufmsg, length);
if (false == m_bget_stream_name)
  {
    m_bget_stream_name = true;
    m_bufmsg[length] = '\0';
    m_streamname = (char*)m_bufmsg;                    
    room_ = create_stream_hub(m_streamname);
  }
  else if(false == m_bget_flv_header)
  {
     m_bget_flv_header = true;
     room_->setmetadata(steambuf);                    
  }
  else
  {
     room_->deliver(steambuf);
 }

stream_httpflv_to

class stream_httpflv_to: public stream_session,
void start()
{           
    do_read_header();
}
"HTTP/1.1 200 OK\r\n
Connection: close\r\n
Content-Type: video/x-flv\r\n
Transfer-Encoding: chunked\r\n
Access-Control-Allow-Origin: *\r\n\r\n";

Content-Type: video/x-flv 表示码流是flv格式的;Transfer-Encoding: chunked表示具体码流的内容按照chunked格式去发送;Access-Control-Allow-Origin: * 一定要有的,表示支持跨域访问,因为我们的http-flv提供出去的端口是1984,所以必须要增加这个http信息。
将http的回应,加入到发送队列中,发起发送请求。

http chunked 传输方式
每个chunk分为头部和正文两部分
1.头部内容指定下一段正文的字符总数(非零开头的十六进制的数字)和数量单位(一般不写,表示字节).
2.正文部分就是指定长度的实际内容,chunk头和内容后面都跟着一个回车换行(CRLF)。
在最后一个长度为0的chunk中的内容是称为footer的内容,是一些附加的Header信息(通常可以直接忽略)

room_ = get_stream_hub(m_streamname);
room_->join_http_flv(shared_from_this());
stream_hub::
    void join_http_flv(stream_session_ptr participant)
    {
        http_flv_sessions_.insert(participant);//add a client       
        if (!m_buf_header.isnull())
        {
            shared_const_buffer_flv flvheader(m_buf_header.m_buffer, shared_const_buffer_flv::em_http_flv);// send flv header
            flvheader.setisflvheader(true);
            flvheader.setisflvstream(true);
            participant->deliver(flvheader);
        }
    }
stream_hub::
void deliver(const boost::asio::mutable_buffer& msg, bool isheader = false)
    {
        if (http_flv_sessions_.size() > 0)
        {
            shared_const_buffer_flv flvbuf(msg, shared_const_buffer_flv::em_http_flv);
            flvbuf.setisflvheader(isheader);
            flvbuf.setisflvstream(true);
            for (auto session: http_flv_sessions_)
                session->deliver(flvbuf);
        }
  }
void deliver(const shared_const_buffer_flv& msg)
   {       
       if (msg.isflvstream() && !msg.isflvheader())
       {
           // all flv info need chunked
           if (false == m_bfirstkeycoming )
           {
               if (!msg.iskeyframe())
               {
                   printf("flvdata keyframe is not coming %s  \r\n", m_szendpoint);
                   return;
               }
               else
               {
                   m_bfirstkeycoming = true;
               }                           
           }
           // just drop the stream data but not the head and protocol
           if (write_msgs_.size() > MAX_STREAM_BUFFER_NUMS)
           {
               //buffer is full, do not need p-frame,so wait the I-frame
               m_bfirstkeycoming = false;
               printf("the buffer over the max number %d, %s\r\n", MAX_STREAM_BUFFER_NUMS, m_szendpoint);
               return;
           }
       }
       
       bool write_in_progress = !write_msgs_.empty();
       write_msgs_.push_back(msg);//会将消息先放到write_msgs_中
       if (!write_in_progress)
       {
           //write message
           do_write();
       }
   }
void do_write()
   {
       shared_const_buffer_flv& ptagflvbuf = write_msgs_.front();
       if (ptagflvbuf.isflvstream())
       {
           const boost::asio::const_buffer* pbuffer = ptagflvbuf.getstreamdata();
           int nsize = boost::asio::buffer_size(*pbuffer);
           const char* pdata = boost::asio::buffer_cast<const char*>(*pbuffer);            

           int nLen;
           //memset(m_szchunkbuf, sizeof(m_szchunkbuf), 0);
           if (pdata[0] == 0x17 || pdata[0] == 0x27)
           {
               int ntaglen = nsize -4;
               nLen = sprintf(m_szchunkbuf, "%x\r\n", nsize+11);
               m_szchunkbuf[nLen+0] = 9; //video
               m_szchunkbuf[nLen+1] = (ntaglen >> 16) & 0xff;
               m_szchunkbuf[nLen+2] = (ntaglen >> 8) & 0xff;
               m_szchunkbuf[nLen+3] = ntaglen & 0xff;

               // nb timestamp
               m_szchunkbuf[nLen+4] = (m_dwtime>> 16) & 0xff;
               m_szchunkbuf[nLen+5] = (m_dwtime>> 8) & 0xff;
               m_szchunkbuf[nLen+6] = m_dwtime& 0xff;
               m_szchunkbuf[nLen+7] = (m_dwtime>> 24) & 0xff;
               m_szchunkbuf[nLen+8] = 0;
               m_szchunkbuf[nLen+9] = 0;
               m_szchunkbuf[nLen+10] = 0;             
                             
               nLen += 11;
               m_dwtime += 40;
           }
           else
           {
               nLen = sprintf(m_szchunkbuf, "%x\r\n", nsize);               
           }
           ptagflvbuf.setchunk(m_szchunkbuf, nLen, m_szchunkend, 2);            
       }

       boost::asio::async_write(socket_,//当前session的socket
           ptagflvbuf,
           [this, self](boost::system::error_code ec, std::size_t length/*length*/)
       {
           if (!ec)
           {
               write_msgs_.pop_front();
               if (!write_msgs_.empty())
               {
                   do_write();
               }
           }        
       });
   }

stream_rtsp_to

rtsp从码流层和http-flv有些区别

stream_rtsp_to 从stream_session继承,业务起始点也在start()内

class stream_rtsp_to: public stream_session,
    void start()
    {
        do_read_header();
    }
stream_hub::
void join_rtsp(stream_session_ptr participant)
{
    rtsp_sessions_.insert(participant);
    // rtsp do not need the flv header
}
shared_const_buffer_flv(const boost::asio::const_buffer& buff, em_buffertype etype, uint64_t dwtimestamp, uint16_t& dwsequence)
    {
        const uint8_t* pData = boost::asio::buffer_cast<const uint8_t*>(buff);
        ...                
        int nLen = boost::asio::buffer_size(buff);      
        uint32_t dwtotallen = nLen;
        if (em_rtsp == etype)
        {
            // 发往每个rtsp客户端的rtp里面的时间戳,并不需要从0开始,而且h264转成rtp时,每个rtp的头,是掺杂在数据中间的,
            // 所以时间戳的递增,从hub这里开始,然后递增,客户端拿到什么rtp时间,就是什么开始,并不受影响
            if (0x17 == pData[0] || 0x27 == pData[0])
            {
                uint32_t dwnumnalus;
                get_rtsp_rtp_video_total_len(pData, nLen, dwtotallen, dwnumnalus);    
                m_streamdata = std::shared_ptr<uint8_t>(new uint8_t[dwtotallen], []( uint8_t *p ) { delete[] p; });                
                bool bret = generate_rtp_info_over_rtsp(pData, nLen, m_streamdata.get(), dwtotallen,
                                                        dwnumnalus, dwtimestamp, dwsequence);                
            }
        }        
        m_abuffer[1] = boost::asio::buffer(m_streamdata.get(), dwtotallen);
    }

hls支持

hls的支持是在stream_hub内部完成,每一路流对应一个m3u8的文件和一组ts文件用于实时播放,生成的m3u8文件和ts文件通过web服务器提供http下载,供客户端进行播放。
生成实时流的m3u8文件是这样的:

#EXTM3U
#EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:8
#EXTINF:1
http://x.x.x.x/static/123abcdef32153421-8.ts
#EXTINF:2
http://x.x.x.x/static/123abcdef32153421-9.ts
#EXTINF:1
http://x.x.x.x/static/123abcdef32153421-10.ts
#EXTINF:1
http://x.x.x.x/static/123abcdef32153421-11.ts

生成m3u8和ts的业务逻辑:

stream_hub::
        m_cur_file_num = 1;
        m_cur_hls_sequence = 1;   
        m_dw_ts_slice_num = 4;        
        m_m3u8_ts_directory = "D:\\github\\Desert-Eagle\\webserver\\static\\";
        //std::string strdirectory = "D:\\nginx-1.10.3\\html\\";
        m_m3u8_ts_prefix = "http://x.x.x.x/static/";
---------
       uint8_t* pData = boost::asio::buffer_cast<uint8_t*>(msg);
        int nLen = boost::asio::buffer_size(msg);
        if (isheader == false || pData[0] == 0x17 || pData[0] == 0x27)
        {
            // generate m3u8 and ts file to the directory
            change_flv_h264_buffer_to_0001_buffer(pData+5, nLen-9);
        }
        if (m_bsupporthls)
        {
            if (isheader == false)
            {
                bool bkeyframe = false;
                if (pData[0] == 0x17)
                {
                    bkeyframe = true;
                    if (nullptr != m_file_ts)
                    {
                        fflush(m_file_ts);
                        fclose(m_file_ts);

                        uint64_t utemp = m_a_file_duration[(m_cur_file_num-1)%3];
                        m_a_file_duration[(m_cur_file_num-1)%3] = (m_u64timestamp - utemp)/90000;// now is duration second
                        printf("ts file is %d duration is %llu\r\n", m_cur_file_num, m_a_file_duration[(m_cur_file_num-1)%3]);
                                                
                        // file number add 1
                        m_cur_file_num++;
                        if (m_cur_file_num - m_cur_hls_sequence  > m_dw_ts_slice_num)
                        {   
                            char szfilepath[256] = {0};
                            sprintf(szfilepath, "%s%s-%d.ts", m_m3u8_ts_directory.c_str(), m_strname.c_str(), m_cur_hls_sequence);
                            m_cur_hls_sequence++;
                            ::remove(szfilepath);                            
                        }
                        
                        uint64_t maxduration = 0;
                        for(int inx = m_cur_hls_sequence; inx < m_cur_file_num; inx++)
                        {
                            maxduration = std::max(maxduration, m_a_file_duration[(inx-1)%3]);
                        }
                        printf("update m3u8 file max duration is %llu\r\n", maxduration);

                        // write m3u8 file
                        std::stringstream strm3u8;
                        strm3u8 << "#EXTM3U\r\n"
                            << "#EXT-X-ALLOW-CACHE:NO\r\n"
                            << "#EXT-X-TARGETDURATION:" <<maxduration <<"\r\n"
                            << "#EXT-X-MEDIA-SEQUENCE:" << m_cur_hls_sequence << "\r\n";
                        for(int inx = m_cur_hls_sequence; inx < m_cur_file_num; inx++)
                        {
                            strm3u8 << "#EXTINF:" << m_a_file_duration[(inx-1)%3] <<"\r\n"
                                << m_m3u8_ts_prefix  << m_strname.c_str() << "-"<< inx << ".ts\r\n" ;
                        }                        
                        std::string strtemp = strm3u8.str();
                        std::string strm3u8filepath = m_m3u8_ts_directory + m_strname +".m3u8";
                        FILE* file_m3u8 = fopen(strm3u8filepath.c_str(), "wb");
                        fwrite(strtemp.c_str(), strtemp.length(), 1, file_m3u8);            
                        fflush(file_m3u8);
                        fclose(file_m3u8);
                        printf("after update m3u8 hls_sequence is %d, cur_file_num is %d\r\n", m_cur_hls_sequence, m_cur_file_num);
                    }                    
                    char szfilepath[256] = {0};
                    sprintf(szfilepath, "%s%s-%d.ts", m_m3u8_ts_directory.c_str(), m_strname.c_str(), m_cur_file_num);
                    m_file_ts = fopen(szfilepath, "wb");
                    m_a_file_duration[(m_cur_file_num-1)%3] = m_u64timestamp;// start time
                }
                uint32_t dwtotallen = 0;
                m_ts.get_ts_frame_totallen(pData+5, nLen-9, bkeyframe, dwtotallen);
                m_ts.generate_ts_frame(pData+5, nLen-9, m_ts_stream_buff.get(), dwtotallen, bkeyframe, m_u64timestamp);
                fwrite(m_ts_stream_buff.get(), dwtotallen, 1, m_file_ts);                
            }

webserver

webserver使用python提供web服务

测试

各种协议测试播放步骤,请查看github的readme文档。
https://github.com/wangdxh/Desert-Eagle/blob/master/README.md

streamserver的代码约么有1650行。


streamserver代码行数
上一篇 下一篇

猜你喜欢

热点阅读