微服务开发实战

GStreamer Probe 探针的妙用

2024-01-06  本文已影响0人  老瓦在霸都

探针 Probe 是一种可由应用程序安装的回调,可将数据流的状态及内容回调给应用程序,在实现中妙用无穷,既可以用来检查在 GStreamer Pipeline 中流传的 Data, Query 和 Event, 还可以用来检查和修改在回调函数中收到的数据,以及阻塞或传递在 pipeline 流传的数据。

先总结一下我在 GStreamer Probe 看到的内容,然后写一个简单的小程序演示一下探针的妙用。

用法

应用程序应该能够监视和控制 pad 上的数据流,有以下用法:

概述

函数 gst_pad_add_probe() 用于在 pad 上添加探针 probe. 它接受的参数有 probe type mask 和一个 callback 函数.

    gulong  gst_pad_add_probe    (GstPad *pad,
                                  GstPadProbeType mask,
                                  GstPadProbeCallback callback,
                                  gpointer user_data,
                                  GDestroyNotify destroy_data);

此函数返回 gulong 数据类型来标识这个探针,这个 probe_id 可以用 gst_pad_remove_probe() 来移除探针

    void    gst_pad_remove_probe (GstPad *pad, gulong id);

而 mask 掩码参数是以下 flags 的按位或操作 "bitwise or"

typedef enum
{
  GST_PAD_PROBE_TYPE_INVALID          = 0,

  /* flags to control blocking */
  GST_PAD_PROBE_TYPE_IDLE             = (1 << 0),
  GST_PAD_PROBE_TYPE_BLOCK            = (1 << 1),

  /* flags to select datatypes */
  GST_PAD_PROBE_TYPE_BUFFER           = (1 << 4),
  GST_PAD_PROBE_TYPE_BUFFER_LIST      = (1 << 5),
  GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM = (1 << 6),
  GST_PAD_PROBE_TYPE_EVENT_UPSTREAM   = (1 << 7),
  GST_PAD_PROBE_TYPE_EVENT_FLUSH      = (1 << 8),
  GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM = (1 << 9),
  GST_PAD_PROBE_TYPE_QUERY_UPSTREAM   = (1 << 10),

  /* flags to select scheduling mode */
  GST_PAD_PROBE_TYPE_PUSH             = (1 << 12),
  GST_PAD_PROBE_TYPE_PULL             = (1 << 13),
} GstPadProbeType;

添加带有 IDLE 或 BLOCK 标志的探针时,探针将成为阻塞探针 (blocking probe)。否则,探针将是数据探针 (Data Probe)。

数据类型和调度选择器标志用于选择在回调函数中可以使用哪种类型的数据类型和调度模式。

Blocking flags 必须与所触发的 probe 完全匹配。

探测回调定义为:


GstPadProbeReturn (*GstPadProbeCallback) (GstPad *pad, GstPadProbeInfo *info,
    gpointer user_data);

probe info 结构作为参数传递,其类型必须匹配于注册回调时使用的掩码。Probe info 中包含的数据项包含特定类型的数据,通常是被阻止的数据项或者是 NULL(当不存在数据项时)

探针可以返回以下任何返回值:

typedef enum
{
  GST_PAD_PROBE_DROP,
  GST_PAD_PROBE_OK,
  GST_PAD_PROBE_REMOVE,
  GST_PAD_PROBE_PASS,
  GST_PAD_PROBE_HANDLED
} GstPadProbeReturn;

阻塞探针

阻塞探针是设置了 BLOCKIDLE 标志的探针。它们将一直阻止数据流并按照以下规则触发回调:

IDLE 探针在执行动态链接时很有用,允许应用程序等待 "unlink/link" 操作的正确执行。由于探针是阻挡探针,它还将确保 pad 保持空闲状态,直到此探针被删除。

阻塞探针由 gst_pad_remove_probe() 函数移除, 或当探针的回调函数返回 GST_PAD_PROBE_REMOVE 时移除此探针。

在这两种情况下,如果这是 pad 上的最后一个阻塞探针,pad 就会被解锁 (unblocked),数据流可以继续。

非阻塞探针

非阻塞探针或数据探针是在 Pad 中数据流动时触发的探针。它们在阻塞探针运行之后调用,并且始终针对于数据。

推送数据流

推送探针在回调中设置了 GST_PAD_PROBE_TYPE_PUSH 标志。

在基于推送的调度中,首先使用数据项调用阻塞探针 (blocking probe)。然后,在对等 pad 的 chainevent 函数调用之前 调用数据探针 (data probe)

在对等的 pad 被检查之前会调用 data probe, 它允许链接 pad 以阻塞探针 (BLOCK probe) 或者数据探针 (DATA probe)

在对等的 pad 的 chain 或 event 函数调用之前, 对等的 pad 的阻塞探针 (BLOCK probe) 或者数据探针 (DATA probe) 会被调用。

最后,在数据发送给对等的 pad 之后 IDEL 探针被调用

推送数据流探针行为与数据缓冲和双向事件的行为相同

                    pad                           peerpad
                     |                               |
gst_pad_push() /     |                               |
gst_pad_push_event() |                               |
-------------------->O                               |
                     O                               |
       flushing?     O                               |
       FLUSHING      O                               |
       < - - - - - - O                               |
                     O-> do BLOCK probes             |
                     O                               |
                     O-> do DATA probes              |
        no peer?     O                               |
       NOT_LINKED    O                               |
       < - - - - - - O                               |
                     O   gst_pad_chain() /           |
                     O   gst_pad_send_event()        |
                     O------------------------------>O
                     O                   flushing?   O
                     O                   FLUSHING    O
                     O< - - - - - - - - - - - - - - -O
                     O                               O-> do BLOCK probes
                     O                               O
                     O                               O-> do DATA probes
                     O                               O
                     O                               O---> chainfunc /
                     O                               O     eventfunc
                     O< - - - - - - - - - - - - - - -O
                     O                               |
                     O-> do IDLE probes              |
                     O                               |
       < - - - - - - O                               |
                     |                               |

拉取数据流

拉取探针在回调中设置了 GST_PAD_PROBE_TYPE_PULL 标志。而 gst_pad_pull_range() 调用先触发 BLOCK 探针, 并不包含数据项。它允许对等的 pad 解析成功前进行连接此 pad, 还允许在回调函数中设置数据项

在对等的 pad 上调用阻塞探针 blocking probe 和 getrange 并有数据项传递之后,此 pad 上的 数据探针 Data Probe 将会被调用。

当控制权返回到 sink pad, IDLE 回调会被调用,没有数据项就会调用 IDLE 回调,所以如果发生错误时也会调用 IDLE 回调

如果有了有效的数据项, DATA 探针也会被调用

                srcpad                          sinkpad
                  |                               |
                  |                               | gst_pad_pull_range()
                  |                               O<---------------------
                  |                               O
                  |                               O  flushing?
                  |                               O  FLUSHING
                  |                               O - - - - - - - - - - >
                  |             do BLOCK probes <-O
                  |                               O   no peer?
                  |                               O  NOT_LINKED
                  |                               O - - - - - - - - - - >
                  |          gst_pad_get_range()  O
                  O<------------------------------O
                  O                               O
                  O flushing?                     O
                  O FLUSHING                      O
                  O- - - - - - - - - - - - - - - >O
do BLOCK probes <-O                               O
                  O                               O
 getrangefunc <---O                               O
                  O  flow error?                  O
                  O- - - - - - - - - - - - - - - >O
                  O                               O
 do DATA probes <-O                               O
                  O- - - - - - - - - - - - - - - >O
                  |                               O
                  |              do IDLE probes <-O
                  |                               O   flow error?
                  |                               O - - - - - - - - - - >
                  |                               O
                  |              do DATA probes <-O
                  |                               O - - - - - - - - - - >
                  |                               |

查询 Queries

查询探针 (Query probes) 在回调中设置了 GST_PAD_PROBE_TYPE_QUERY_* 标志.

                    pad                           peerpad
                     |                               |
gst_pad_peer_query() |                               |
-------------------->O                               |
                     O                               |
                     O-> do BLOCK probes             |
                     O                               |
                     O-> do QUERY | PUSH probes      |
        no peer?     O                               |
          FALSE      O                               |
       < - - - - - - O                               |
                     O   gst_pad_query()             |
                     O------------------------------>O
                     O                               O-> do BLOCK probes
                     O                               O
                     O                               O-> do QUERY | PUSH probes
                     O                               O
                     O                               O---> queryfunc
                     O                    error      O
       <- - - - - - - - - - - - - - - - - - - - - - -O
                     O                               O
                     O                               O-> do QUERY | PULL probes
                     O< - - - - - - - - - - - - - - -O
                     O                               |
                     O-> do QUERY | PULL probes      |
                     O                               |
       < - - - - - - O                               |
                     |                               |

对于查询 queries, 当查询到达了要回答这个查询的对象, ProbeType 设置为 PUSH
而当 query 中已经包含了答案 , ProbeType 将设置为 PULL

用例

预滚动部分管道

    .---------.      .---------.                .----------.
    | filesrc |      | demuxer |     .-----.    | decoder1 |
    |        src -> sink      src1 ->|queue|-> sink       src
    '---------'      |         |     '-----'    '----------' X
                     |         |                .----------.
                     |         |     .-----.    | decoder2 |
                     |        src2 ->|queue|-> sink       src
                     '---------'     '-----'    '----------' X

其目的是动态创建管道,一直连到解码器,但尚未将它们连接到接收器 sink, 并且不会丢失任何数据。

为此,解码器的 source pad 被阻止,以便没有 event 或 buffer 可以逃脱,并且我们也不会中断数据流。

当所有动态 pad 都被创建(分支点不再创建出新的 pad,即解复用器 demuxer 或队列 queue 已满)并且 pad 被阻止(收到 blocked callback )时,管道将完全预滚动 (preroll)。

然后应该可以在预滚动的管道上执行以下操作:

在一个 PLAYING 的管道中动态切换一个元件

 .----------.      .----------.      .----------.
 | element1 |      | element2 |      | element3 |
...        src -> sink       src -> sink       ...
 '----------'      '----------'      '----------'
                   .----------.
                   | element4 |
                  sink       src
                   '----------'

其目的是在 PLAYING 的管道中将 element4 替换为 element2

  1. 阻塞 element1 的 src pad.
  2. 在阻塞回调函数的内部,element1 和 element2 之间没有任何数据流动,并且在解除阻塞之前不会有任何数据流动。
  3. 将 element1 和 element2 解除连接
  4. 可选步骤:确保数据从 element2 中清除:
  1. 解除 element2 和 element3 的连接
  1. 连接 element4 和 element3
  2. 连接 element1 和 element4
  3. 确保 element4 与其余元素处于相同的状态。 该元素至少应该是 PAUSED 状态
  4. 解除 element1 src pad 的阻塞

相同的流程可用于替换“PAUSED”管道中的元素。 当然,在 “PAUSED” 管道中可能没有数据流,因此阻塞可能不会立即发生。

示例

1. 探查 pad 上的收到的 event, query 和 data

#include <gst/gst.h>
#include <string>
#include <iostream>
#include <sstream>
#include <iomanip>

static std::string bytes_to_hex(uint8_t* bytes, size_t len) {
    std::stringstream ss;

    // Ensure the output is in uppercase and has two characters for each byte
    ss << std::uppercase << std::setfill('0');
    for (size_t i = 0; i < len; ++i) {
        ss << std::setw(2) << std::hex << static_cast<int>(bytes[i]);
    }

    return ss.str();
}

static gboolean handoff_callback(GstElement *sink, GstBuffer *buffer, gpointer user_data)
{
  // This function will be called when the handoff signal is emitted
  g_print("Handoff Callback - Received buffer with size: %zu\n", gst_buffer_get_size(buffer));
  return TRUE;
}

GstPadProbeReturn cb_have_data_list(GstPad *pad, GstPadProbeInfo *info,
                                    gpointer user_data)
{

    GstBufferList *buflist = gst_pad_probe_info_get_buffer_list(info);
    if (buflist)
        g_print("cb_hava_data_list from %s\n", (char *)user_data);
    return GST_PAD_PROBE_OK;
}

GstPadProbeReturn cb_have_data(GstPad *pad, GstPadProbeInfo *info,
                                gpointer user_data)
{

    //GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
    GstBuffer *buf = (GstBuffer *) info->data;
    GstMapInfo in_map_info;
    memset (&in_map_info, 0, sizeof (in_map_info));
    static uint32_t data_count = 0;

    if (buf) {
      g_print("cb_hava_data: offet=%lu from %s\n", buf->offset,
              (char *)user_data);
      if (gst_buffer_map (buf, &in_map_info, GST_MAP_READWRITE))
      {
        std::string hexstr = bytes_to_hex(in_map_info.data, in_map_info.size);
        std::cout << "cb_hava_data, count=" <<(++data_count) << ", size=" << in_map_info.size;
        if (data_count % 5 == 0) {
          std::cout << "receive data=" << hexstr << std::endl;
        }
        
      }
    }
    return GST_PAD_PROBE_OK;
}

GstPadProbeReturn cb_have_event(GstPad *pad, GstPadProbeInfo *info,
                                    gpointer user_data)
{

    GstEvent *event = gst_pad_probe_info_get_event(info);
    if (event)
      g_print("cb_hava_event: event type=%s from %s\n", GST_EVENT_TYPE_NAME(event),
              (char *)user_data);

    return GST_PAD_PROBE_OK;
}

GstPadProbeReturn cb_have_query(GstPad *pad, GstPadProbeInfo *info,
                                gpointer user_data)
{

    GstQuery *query = gst_pad_probe_info_get_query(info);

    g_print("cb_hava_query: query type=%s from %s\n", GST_QUERY_TYPE_NAME(query),
          (char *)user_data);

    return GST_PAD_PROBE_OK;
}

void add_pad_probe(GstPad *pad_to_probe, const char *evt_pad_name)
{
    // GST_PAD_PROBE_TYPE_BUFFER_LIST
    gulong probe_id_0 = gst_pad_add_probe(pad_to_probe,
                                          GST_PAD_PROBE_TYPE_BUFFER_LIST, cb_have_data_list,
                                          (gpointer)evt_pad_name, NULL);

    // GST_PAD_PROBE_TYPE_BUFFER
    gulong probe_id_1 = gst_pad_add_probe(pad_to_probe,
                                          GST_PAD_PROBE_TYPE_BUFFER, cb_have_data, (gpointer)evt_pad_name,
                                          NULL);

    // GST_PAD_PROBE_TYPE_EVENT_BOTH
    gulong probe_id_2 = gst_pad_add_probe(pad_to_probe,
                                          GST_PAD_PROBE_TYPE_EVENT_BOTH, cb_have_event,
                                          (gpointer)evt_pad_name, NULL);

    // GST_PAD_PROBE_TYPE_QUERY_BOTH
    gulong probe_id_3 = gst_pad_add_probe(pad_to_probe,
                                          GST_PAD_PROBE_TYPE_QUERY_BOTH, cb_have_query,
                                          (gpointer)evt_pad_name, NULL);

}

int main(int argc, char *argv[])
{

  GstElement *pipeline, *source, *sink;
  GstBus *bus;
  GstMessage *msg;
  GstStateChangeReturn ret;

  gst_init(&argc, &argv);

  /* Create the elements */
  source = gst_element_factory_make("audiotestsrc", "source");
  sink = gst_element_factory_make("fakesink", "sink");

  /* Create the empty pipeline */
  pipeline = gst_pipeline_new("test-pipeline");

  if (!pipeline || !source || !sink)
  {
    g_printerr("Not all elements could be created.\n");
    return -1;
  }

  /* Build the pipeline */
  gst_bin_add_many(GST_BIN(pipeline), source, sink, NULL);
  if (gst_element_link(source, sink) != TRUE)
  {
    g_printerr("Elements could not be linked.\n");
    gst_object_unref(pipeline);
    return -1;
  }

  /* Modify the source's properties */
  g_object_set(source, "num-buffers", 10, NULL);
  g_object_set(source, "wave", 5, NULL);

  // Connect the handoff signal to the callback function
  g_signal_connect(sink, "handoff", G_CALLBACK(handoff_callback), NULL);

  GstPad* audioSinkPad = gst_element_get_static_pad(sink, "sink");
  add_pad_probe(audioSinkPad, "fakesink");

  /* Start playing */
  ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE)
  {
    g_printerr("Unable to set the pipeline to the playing state.\n");
    gst_object_unref(pipeline);
    return -1;
  }

  /* Wait until error or EOS */
  bus = gst_element_get_bus(pipeline);
  msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
                                 (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));

  /* Parse message */
  if (msg != NULL)
  {
    GError *err;
    gchar *debug_info;

    switch (GST_MESSAGE_TYPE(msg))
    {
    case GST_MESSAGE_ERROR:
      gst_message_parse_error(msg, &err, &debug_info);
      g_printerr("Error received from element %s: %s\n",
                 GST_OBJECT_NAME(msg->src), err->message);
      g_printerr("Debugging information: %s\n",
                 debug_info ? debug_info : "none");
      g_clear_error(&err);
      g_free(debug_info);
      break;
    case GST_MESSAGE_EOS:
      g_print("End-Of-Stream reached.\n");
      break;
    default:
      /* We should not reach here because we only asked for ERRORs and EOS */
      g_printerr("Unexpected message received.\n");
      break;
    }
    gst_message_unref(msg);
  }

  /* Free resources */
  gst_object_unref(bus);
  gst_element_set_state(pipeline, GST_STATE_NULL);
  gst_object_unref(pipeline);
  return 0;
}


执行结果如下

../bin/gst-fakesink
cb_hava_event: event type=stream-start from fakesink
cb_hava_query: query type=caps from fakesink
cb_hava_query: query type=caps from fakesink
cb_hava_event: event type=caps from fakesink
cb_hava_query: query type=accept-caps from fakesink
cb_hava_query: query type=accept-caps from fakesink
cb_hava_query: query type=allocation from fakesink
cb_hava_event: event type=segment from fakesink
cb_hava_event: event type=tag from fakesink
cb_hava_data: offet=0 from fakesink
cb_hava_data, count=1, size=2048cb_hava_query: query type=latency from fakesink
cb_hava_query: query type=latency from fakesink
cb_hava_event: event type=latency from fakesink
cb_hava_data: offet=1024 from fakesink
cb_hava_data, count=2, size=2048cb_hava_data: offet=2048 from fakesink
cb_hava_data, count=3, size=2048cb_hava_data: offet=3072 from fakesink
cb_hava_data, count=4, size=2048cb_hava_data: offet=4096 from fakesink
cb_hava_data, count=5, size=2048receive data
cb_hava_data: offet=5120 from fakesink
cb_hava_data, count=6, size=2048cb_hava_data: offet=6144 from fakesink
cb_hava_data, count=7, size=2048cb_hava_data: offet=7168 from fakesink
cb_hava_data, count=8, size=2048cb_hava_data: offet=8192 from fakesink
cb_hava_data, count=9, size=2048cb_hava_data: offet=9216 from fakesink
cb_hava_data, count=10, size=2048receive data
cb_hava_event: event type=eos from fakesink
End-Of-Stream reached.

参考资料

上一篇下一篇

猜你喜欢

热点阅读