GStreamer Probe 探针的妙用
探针 Probe 是一种可由应用程序安装的回调,可将数据流的状态及内容回调给应用程序,在实现中妙用无穷,既可以用来检查在 GStreamer Pipeline 中流传的 Data, Query 和 Event, 还可以用来检查和修改在回调函数中收到的数据,以及阻塞或传递在 pipeline 流传的数据。
先总结一下我在 GStreamer Probe 看到的内容,然后写一个简单的小程序演示一下探针的妙用。
用法
应用程序应该能够监视和控制 pad 上的数据流,有以下用法:
- 当 pad 空闲时收到通知,并确保 pad 保持空闲。这对于能够实现元件的动态重新链接至关重要,从而不会中断数据流。
- 当数据 data、事件 event 或查询 query 被发送到 pad 上就会被通知,这样就可以在回调函数中检查和修改数据。
- 能够根据回调函数返回的结果,在 pad 上丢弃、传递和阻塞数据
- 能够在被阻塞的 pad 中由应用线程执行的方法丢弃,传递数据
概述
函数 gst_pad_add_probe()
用于在 pad 上添加探针 probe. 它接受的参数有 probe type mask 和一个 callback 函数.
gulong gst_pad_add_probe (GstPad *pad,
GstPadProbeType mask,
GstPadProbeCallback callback,
gpointer user_data,
GDestroyNotify destroy_data);
此函数返回 gulong 数据类型来标识这个探针,这个 probe_id 可以用 gst_pad_remove_probe()
来移除探针
void gst_pad_remove_probe (GstPad *pad, gulong id);
而 mask 掩码参数是以下 flags 的按位或操作 "bitwise or"
typedef enum
{
GST_PAD_PROBE_TYPE_INVALID = 0,
/* flags to control blocking */
GST_PAD_PROBE_TYPE_IDLE = (1 << 0),
GST_PAD_PROBE_TYPE_BLOCK = (1 << 1),
/* flags to select datatypes */
GST_PAD_PROBE_TYPE_BUFFER = (1 << 4),
GST_PAD_PROBE_TYPE_BUFFER_LIST = (1 << 5),
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM = (1 << 6),
GST_PAD_PROBE_TYPE_EVENT_UPSTREAM = (1 << 7),
GST_PAD_PROBE_TYPE_EVENT_FLUSH = (1 << 8),
GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM = (1 << 9),
GST_PAD_PROBE_TYPE_QUERY_UPSTREAM = (1 << 10),
/* flags to select scheduling mode */
GST_PAD_PROBE_TYPE_PUSH = (1 << 12),
GST_PAD_PROBE_TYPE_PULL = (1 << 13),
} GstPadProbeType;
添加带有 IDLE 或 BLOCK 标志的探针时,探针将成为阻塞探针 (blocking probe)。否则,探针将是数据探针 (Data Probe)。
数据类型和调度选择器标志用于选择在回调函数中可以使用哪种类型的数据类型和调度模式。
Blocking flags 必须与所触发的 probe 完全匹配。
探测回调定义为:
GstPadProbeReturn (*GstPadProbeCallback) (GstPad *pad, GstPadProbeInfo *info,
gpointer user_data);
probe info 结构作为参数传递,其类型必须匹配于注册回调时使用的掩码。Probe info 中包含的数据项包含特定类型的数据,通常是被阻止的数据项或者是 NULL(当不存在数据项时)
探针可以返回以下任何返回值:
typedef enum
{
GST_PAD_PROBE_DROP,
GST_PAD_PROBE_OK,
GST_PAD_PROBE_REMOVE,
GST_PAD_PROBE_PASS,
GST_PAD_PROBE_HANDLED
} GstPadProbeReturn;
-
GST_PAD_PROBE_OK
是正常的返回值。_DROP
将丢弃目前正在探测的数据,GST_PAD_PROBE_REMOVE
将从探针列表中移除当前所正在执行的探针 -
GST_PAD_PROBE_PASS
与阻塞探针相关,并将暂时取消阻塞, 让数据项通过,然后它会再次阻塞下一个数据项。
阻塞探针
阻塞探针是设置了 BLOCK
或 IDLE
标志的探针。它们将一直阻止数据流并按照以下规则触发回调:
- 当
IDLE
标志设置后,一旦没有数据经过 pad,就会调用探针的回调函数。如果在探针注册时,pad 处于空闲状态, 回调函数会立即从当前线程调用。否则, 一旦 pad 的状态变为空闲 (Idle),就会在数据流线程中调用探针的回调函数。
IDLE
探针在执行动态链接时很有用,允许应用程序等待 "unlink/link" 操作的正确执行。由于探针是阻挡探针,它还将确保 pad 保持空闲状态,直到此探针被删除。
- 当
BLOCK
标志设置后,当有新数据时到达 Pad 时将调用探针的回调函数,也就在 pad 即将进入阻塞状态之前。因此只有当 pad 上有新数据时才会调用此探针的回调。
阻塞探针由 gst_pad_remove_probe()
函数移除, 或当探针的回调函数返回 GST_PAD_PROBE_REMOVE
时移除此探针。
在这两种情况下,如果这是 pad 上的最后一个阻塞探针,pad 就会被解锁 (unblocked),数据流可以继续。
非阻塞探针
非阻塞探针或数据探针是在 Pad 中数据流动时触发的探针。它们在阻塞探针运行之后调用,并且始终针对于数据。
推送数据流
推送探针在回调中设置了 GST_PAD_PROBE_TYPE_PUSH
标志。
在基于推送的调度中,首先使用数据项调用阻塞探针 (blocking probe)。然后,在对等 pad 的 chain
或 event
函数调用之前 调用数据探针 (data probe)
在对等的 pad 被检查之前会调用 data probe, 它允许链接 pad 以阻塞探针 (BLOCK probe) 或者数据探针 (DATA probe)
在对等的 pad 的 chain 或 event 函数调用之前, 对等的 pad 的阻塞探针 (BLOCK probe) 或者数据探针 (DATA probe) 会被调用。
最后,在数据发送给对等的 pad 之后 IDEL
探针被调用
推送数据流探针行为与数据缓冲和双向事件的行为相同
pad peerpad
| |
gst_pad_push() / | |
gst_pad_push_event() | |
-------------------->O |
O |
flushing? O |
FLUSHING O |
< - - - - - - O |
O-> do BLOCK probes |
O |
O-> do DATA probes |
no peer? O |
NOT_LINKED O |
< - - - - - - O |
O gst_pad_chain() / |
O gst_pad_send_event() |
O------------------------------>O
O flushing? O
O FLUSHING O
O< - - - - - - - - - - - - - - -O
O O-> do BLOCK probes
O O
O O-> do DATA probes
O O
O O---> chainfunc /
O O eventfunc
O< - - - - - - - - - - - - - - -O
O |
O-> do IDLE probes |
O |
< - - - - - - O |
| |
拉取数据流
拉取探针在回调中设置了 GST_PAD_PROBE_TYPE_PULL
标志。而 gst_pad_pull_range()
调用先触发 BLOCK
探针, 并不包含数据项。它允许对等的 pad 解析成功前进行连接此 pad, 还允许在回调函数中设置数据项
在对等的 pad 上调用阻塞探针 blocking probe 和 getrange
并有数据项传递之后,此 pad 上的 数据探针 Data Probe 将会被调用。
当控制权返回到 sink pad, IDLE
回调会被调用,没有数据项就会调用 IDLE
回调,所以如果发生错误时也会调用 IDLE
回调
如果有了有效的数据项, DATA
探针也会被调用
srcpad sinkpad
| |
| | gst_pad_pull_range()
| O<---------------------
| O
| O flushing?
| O FLUSHING
| O - - - - - - - - - - >
| do BLOCK probes <-O
| O no peer?
| O NOT_LINKED
| O - - - - - - - - - - >
| gst_pad_get_range() O
O<------------------------------O
O O
O flushing? O
O FLUSHING O
O- - - - - - - - - - - - - - - >O
do BLOCK probes <-O O
O O
getrangefunc <---O O
O flow error? O
O- - - - - - - - - - - - - - - >O
O O
do DATA probes <-O O
O- - - - - - - - - - - - - - - >O
| O
| do IDLE probes <-O
| O flow error?
| O - - - - - - - - - - >
| O
| do DATA probes <-O
| O - - - - - - - - - - >
| |
查询 Queries
查询探针 (Query probes) 在回调中设置了 GST_PAD_PROBE_TYPE_QUERY_*
标志.
pad peerpad
| |
gst_pad_peer_query() | |
-------------------->O |
O |
O-> do BLOCK probes |
O |
O-> do QUERY | PUSH probes |
no peer? O |
FALSE O |
< - - - - - - O |
O gst_pad_query() |
O------------------------------>O
O O-> do BLOCK probes
O O
O O-> do QUERY | PUSH probes
O O
O O---> queryfunc
O error O
<- - - - - - - - - - - - - - - - - - - - - - -O
O O
O O-> do QUERY | PULL probes
O< - - - - - - - - - - - - - - -O
O |
O-> do QUERY | PULL probes |
O |
< - - - - - - O |
| |
对于查询 queries, 当查询到达了要回答这个查询的对象, ProbeType 设置为 PUSH
而当 query 中已经包含了答案 , ProbeType 将设置为 PULL
用例
预滚动部分管道
.---------. .---------. .----------.
| filesrc | | demuxer | .-----. | decoder1 |
| src -> sink src1 ->|queue|-> sink src
'---------' | | '-----' '----------' X
| | .----------.
| | .-----. | decoder2 |
| src2 ->|queue|-> sink src
'---------' '-----' '----------' X
其目的是动态创建管道,一直连到解码器,但尚未将它们连接到接收器 sink, 并且不会丢失任何数据。
为此,解码器的 source pad 被阻止,以便没有 event 或 buffer 可以逃脱,并且我们也不会中断数据流。
当所有动态 pad 都被创建(分支点不再创建出新的 pad,即解复用器 demuxer 或队列 queue 已满)并且 pad 被阻止(收到 blocked callback )时,管道将完全预滚动 (preroll)。
然后应该可以在预滚动的管道上执行以下操作:
-
查询时长/位置
-
执行刷新搜索以预滚动到新位置
-
连接其他元件并疏通堵塞的 pads
在一个 PLAYING 的管道中动态切换一个元件
.----------. .----------. .----------.
| element1 | | element2 | | element3 |
... src -> sink src -> sink ...
'----------' '----------' '----------'
.----------.
| element4 |
sink src
'----------'
其目的是在 PLAYING
的管道中将 element4 替换为 element2
- 阻塞 element1 的 src pad.
- 在阻塞回调函数的内部,element1 和 element2 之间没有任何数据流动,并且在解除阻塞之前不会有任何数据流动。
- 将 element1 和 element2 解除连接
- 可选步骤:确保数据从 element2 中清除:
- 4a) 在 element2 src pad上添加事件探针
- 4b) 将
EOS
事件发送到 element2,这可确保 element2 清除其保存的最后一位数据。 - 4c) 等待
EOS
出现在探针中,丢弃EOS
事件 - 4d) 删除 pad 上的
EOS
事件探针。
- 解除 element2 和 element3 的连接
- 5a) 现在可以选择将 element2 设置为“NULL”和/或从管道中将其删除。
- 连接 element4 和 element3
- 连接 element1 和 element4
- 确保 element4 与其余元素处于相同的状态。 该元素至少应该是
PAUSED
状态 - 解除 element1 src pad 的阻塞
相同的流程可用于替换“PAUSED”管道中的元素。 当然,在 “PAUSED” 管道中可能没有数据流,因此阻塞可能不会立即发生。
示例
1. 探查 pad 上的收到的 event, query 和 data
#include <gst/gst.h>
#include <string>
#include <iostream>
#include <sstream>
#include <iomanip>
static std::string bytes_to_hex(uint8_t* bytes, size_t len) {
std::stringstream ss;
// Ensure the output is in uppercase and has two characters for each byte
ss << std::uppercase << std::setfill('0');
for (size_t i = 0; i < len; ++i) {
ss << std::setw(2) << std::hex << static_cast<int>(bytes[i]);
}
return ss.str();
}
static gboolean handoff_callback(GstElement *sink, GstBuffer *buffer, gpointer user_data)
{
// This function will be called when the handoff signal is emitted
g_print("Handoff Callback - Received buffer with size: %zu\n", gst_buffer_get_size(buffer));
return TRUE;
}
GstPadProbeReturn cb_have_data_list(GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
GstBufferList *buflist = gst_pad_probe_info_get_buffer_list(info);
if (buflist)
g_print("cb_hava_data_list from %s\n", (char *)user_data);
return GST_PAD_PROBE_OK;
}
GstPadProbeReturn cb_have_data(GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
//GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
GstBuffer *buf = (GstBuffer *) info->data;
GstMapInfo in_map_info;
memset (&in_map_info, 0, sizeof (in_map_info));
static uint32_t data_count = 0;
if (buf) {
g_print("cb_hava_data: offet=%lu from %s\n", buf->offset,
(char *)user_data);
if (gst_buffer_map (buf, &in_map_info, GST_MAP_READWRITE))
{
std::string hexstr = bytes_to_hex(in_map_info.data, in_map_info.size);
std::cout << "cb_hava_data, count=" <<(++data_count) << ", size=" << in_map_info.size;
if (data_count % 5 == 0) {
std::cout << "receive data=" << hexstr << std::endl;
}
}
}
return GST_PAD_PROBE_OK;
}
GstPadProbeReturn cb_have_event(GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
GstEvent *event = gst_pad_probe_info_get_event(info);
if (event)
g_print("cb_hava_event: event type=%s from %s\n", GST_EVENT_TYPE_NAME(event),
(char *)user_data);
return GST_PAD_PROBE_OK;
}
GstPadProbeReturn cb_have_query(GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
GstQuery *query = gst_pad_probe_info_get_query(info);
g_print("cb_hava_query: query type=%s from %s\n", GST_QUERY_TYPE_NAME(query),
(char *)user_data);
return GST_PAD_PROBE_OK;
}
void add_pad_probe(GstPad *pad_to_probe, const char *evt_pad_name)
{
// GST_PAD_PROBE_TYPE_BUFFER_LIST
gulong probe_id_0 = gst_pad_add_probe(pad_to_probe,
GST_PAD_PROBE_TYPE_BUFFER_LIST, cb_have_data_list,
(gpointer)evt_pad_name, NULL);
// GST_PAD_PROBE_TYPE_BUFFER
gulong probe_id_1 = gst_pad_add_probe(pad_to_probe,
GST_PAD_PROBE_TYPE_BUFFER, cb_have_data, (gpointer)evt_pad_name,
NULL);
// GST_PAD_PROBE_TYPE_EVENT_BOTH
gulong probe_id_2 = gst_pad_add_probe(pad_to_probe,
GST_PAD_PROBE_TYPE_EVENT_BOTH, cb_have_event,
(gpointer)evt_pad_name, NULL);
// GST_PAD_PROBE_TYPE_QUERY_BOTH
gulong probe_id_3 = gst_pad_add_probe(pad_to_probe,
GST_PAD_PROBE_TYPE_QUERY_BOTH, cb_have_query,
(gpointer)evt_pad_name, NULL);
}
int main(int argc, char *argv[])
{
GstElement *pipeline, *source, *sink;
GstBus *bus;
GstMessage *msg;
GstStateChangeReturn ret;
gst_init(&argc, &argv);
/* Create the elements */
source = gst_element_factory_make("audiotestsrc", "source");
sink = gst_element_factory_make("fakesink", "sink");
/* Create the empty pipeline */
pipeline = gst_pipeline_new("test-pipeline");
if (!pipeline || !source || !sink)
{
g_printerr("Not all elements could be created.\n");
return -1;
}
/* Build the pipeline */
gst_bin_add_many(GST_BIN(pipeline), source, sink, NULL);
if (gst_element_link(source, sink) != TRUE)
{
g_printerr("Elements could not be linked.\n");
gst_object_unref(pipeline);
return -1;
}
/* Modify the source's properties */
g_object_set(source, "num-buffers", 10, NULL);
g_object_set(source, "wave", 5, NULL);
// Connect the handoff signal to the callback function
g_signal_connect(sink, "handoff", G_CALLBACK(handoff_callback), NULL);
GstPad* audioSinkPad = gst_element_get_static_pad(sink, "sink");
add_pad_probe(audioSinkPad, "fakesink");
/* Start playing */
ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
{
g_printerr("Unable to set the pipeline to the playing state.\n");
gst_object_unref(pipeline);
return -1;
}
/* Wait until error or EOS */
bus = gst_element_get_bus(pipeline);
msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
(GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
/* Parse message */
if (msg != NULL)
{
GError *err;
gchar *debug_info;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n",
GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging information: %s\n",
debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
break;
case GST_MESSAGE_EOS:
g_print("End-Of-Stream reached.\n");
break;
default:
/* We should not reach here because we only asked for ERRORs and EOS */
g_printerr("Unexpected message received.\n");
break;
}
gst_message_unref(msg);
}
/* Free resources */
gst_object_unref(bus);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
return 0;
}
执行结果如下
../bin/gst-fakesink
cb_hava_event: event type=stream-start from fakesink
cb_hava_query: query type=caps from fakesink
cb_hava_query: query type=caps from fakesink
cb_hava_event: event type=caps from fakesink
cb_hava_query: query type=accept-caps from fakesink
cb_hava_query: query type=accept-caps from fakesink
cb_hava_query: query type=allocation from fakesink
cb_hava_event: event type=segment from fakesink
cb_hava_event: event type=tag from fakesink
cb_hava_data: offet=0 from fakesink
cb_hava_data, count=1, size=2048cb_hava_query: query type=latency from fakesink
cb_hava_query: query type=latency from fakesink
cb_hava_event: event type=latency from fakesink
cb_hava_data: offet=1024 from fakesink
cb_hava_data, count=2, size=2048cb_hava_data: offet=2048 from fakesink
cb_hava_data, count=3, size=2048cb_hava_data: offet=3072 from fakesink
cb_hava_data, count=4, size=2048cb_hava_data: offet=4096 from fakesink
cb_hava_data, count=5, size=2048receive data
cb_hava_data: offet=5120 from fakesink
cb_hava_data, count=6, size=2048cb_hava_data: offet=6144 from fakesink
cb_hava_data, count=7, size=2048cb_hava_data: offet=7168 from fakesink
cb_hava_data, count=8, size=2048cb_hava_data: offet=8192 from fakesink
cb_hava_data, count=9, size=2048cb_hava_data: offet=9216 from fakesink
cb_hava_data, count=10, size=2048receive data
cb_hava_event: event type=eos from fakesink
End-Of-Stream reached.
- 源代码: https://github.com/walterfan/gstreamer-cookbook/blob/master/example/gst-fakesink.cpp
- 编译脚本: https://github.com/walterfan/gstreamer-cookbook/blob/master/example/CMakeLists.txt