gstreamer分析-rtp流的接收
2018-06-08 本文已影响344人
help_youself
关于数据流的接收过程,[1]有很好的分析。这个问题,令人困惑,依然有些东西没有搞明白。gst_pad_start_task()创建任务,使得gst_base_src_loop()被循环调用。
gst_base_src_set_playing
{
if (start)
gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
basesrc->srcpad, NULL);
}
static void
gst_base_src_loop (GstPad * pad)
{
ret = gst_base_src_get_range (src, position, blocksize, &buf);
}
static GstFlowReturn
gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
GstBuffer ** buf)
{
ret = bclass->create (src, offset, length, &res_buf);
//GstBaseClass中的函数指针klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
if (src->priv->pending_bufferlist != NULL) {
ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
src->priv->pending_bufferlist = NULL;
} else {
ret = gst_pad_push (pad, buf);//数据在pipeline中流动,调用下一个element的chainfun,推送数据。
}
}
static GstFlowReturn
gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buffer)
{
ret = bclass->fill (src, offset, size, res_buf);
//这个函数在子类中被覆盖gstbasesrc_class->fill = GST_DEBUG_FUNCPTR (gst_push_src_fill);
}
static GstFlowReturn
gst_push_src_fill (GstBaseSrc * bsrc, guint64 offset, guint length,
GstBuffer * ret)
{
pclass = GST_PUSH_SRC_GET_CLASS (src);
if (pclass->fill)//GstPushSrcClass有同名的方法,与bclass->fill不同
fret = pclass->fill (src, ret);//这个方法再次被子类覆盖
}
//其中GstUDPSrcClass继承了GstPushSrc
//gstpushsrc_class->fill = gst_udpsrc_fill;
static GstFlowReturn
gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf)
{
//问题出现,向socket读取数据,数据流就一定到来了吗?
//g_socket_condition_timed_wait说明这个socket并不是一个阻塞的socket。那么要是没有数据该怎么处理?
res =
g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2,
p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err);
}
udosrc怎么同rtpbin关联起来呢?gst-lpugins-good/tests/examples/client-rtpaux.c下有个例子,有个图:
example
udosrc和rtpbin都继承了element。gst_element_link_pads将两者连起来。
rtpSrc = gst_element_factory_make ("udpsrc", NULL);
gst_element_link_pads (rtpSrc, "src", rtpBin, padName);
rtpbin中的pad是如何接收数据呢?
static GstPad *
gst_rtp_bin_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
result = create_recv_rtp (rtpbin, templ, pad_name);
}
}
static GstPad *
create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
{
recv_rtp_sink = complete_session_sink (rtpbin, session, TRUE);
}
static GstPad *
complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session,
gboolean bundle_demuxer_needed)
{
session->recv_rtp_sink =
gst_element_get_request_pad (session->session, "recv_rtp_sink");
}
static GstPad *
gst_rtp_session_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
/* figure out the template */
if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
if (rtpsession->recv_rtp_sink != NULL)
goto exists;
result = create_recv_rtp_sink (rtpsession);
}
}
static GstPad *
create_recv_rtp_sink (GstRtpSession * rtpsession)
{
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp);
return rtpsession->recv_rtp_sink;
}
gst_pad_push中调用的chainfun就是这个gst_rtp_session_chain_recv_rtp。
接下来分析rtp数据流的处理,以on-feedback-rtcp这个类型的消息为例。
static GstFlowReturn
gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
ret = rtp_session_process_rtp (priv->session, buffer, current_time,
running_time, ntpnstime);
}
GstFlowReturn
rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
{
/* update pinfo stats */
if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
current_time, running_time, ntpnstime)) {
GST_DEBUG ("invalid RTP packet received");
RTP_SESSION_UNLOCK (sess);
return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
}
}
GstFlowReturn
rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
GstClockTime current_time, guint64 ntpnstime)
{
switch (type) {
case GST_RTCP_TYPE_PSFB:
rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
break;
}
}
static void
rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
RTPPacketInfo * pinfo, GstClockTime current_time)
{
if (g_signal_has_handler_pending (sess,
rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
}
}
//需要查找下程序哪里注册了"on-feedback-rtcp"这个信号。
//以此为例,信号的注册是这个形式g_signal_connect (rtpsession, "on-feedback-rtcp", G_CALLBACK (callback_fun), self);
//信号发射g_signal_emit,就会调用callback_fun进行处理。
//这样的注册函数在kurento里有。
static void
kms_remb_remote_on_feedback_rtcp (GObject *rtpsession,
guint type, guint fbtype, guint sender_ssrc, guint media_ssrc,
GstBuffer *fci)
{
GST_LOG_OBJECT (rtpsession, "Signal 'on-feedback-rtcp'");
if (type == GST_RTCP_TYPE_PSFB
&& fbtype == GST_RTCP_PSFB_TYPE_AFB) {
process_psfb_afb (rtpsession, sender_ssrc, fci);
}
}
前前后后一共浪费了六天时间,好奇心害死猫。以后应该研究一些数学密集型的程序,而不是逻辑密集型的程序。
剧终。
[1]GStreamer插件架构简析