Control queue size before dropping frames

Hello,

We have a use case where we want to control the queue size before dropping some frames.

we used:

  1. max-size-time=0, max-size-bytes=0. we set them by zero to disable their maximum.
  2. max-size-buffers=50. it’s set to 50 before dropping
  3. leaky=2. to drop frames downstream.
  4. we added an identity element, and attached handoff_callback to memic latency of the downstream elements.

Results

  • playback is affected by the latency (slow).
  • observed queue size did not go beyond 4 buffers.

Questions

  • According to the documentation: the queue separates the downstream processing from the upstream processing (since it spawns a thread), why can’t the queue reach a size of 50 buffers ?

  • If there is any alternative solution, please mention.

gstreamer used is 1.16.3

What do you have upstream and downstream ?

If they both produce/consume at the same rate, it’s expected that the queue would never reach a level of 50 buffers stored.

Here is the code sample:

#include <gst/gst.h>

#include <chrono>
#include <thread>

#ifdef __APPLE__
#include <TargetConditionals.h>
#endif

typedef struct _CustomData {
  GstElement *pipeline;
  GMainLoop *loop;
  GstElement *source;
  GstElement *videoconvert;
  GstElement *queue;
  GstElement *identity;
  GstElement *sink;
} CustomData;

static void on_queue_overrun(GstElement *queue, gpointer user_data) {
  g_print("queue reached its max, dropping some frames!\n");
  //  guint current_level_buffers, current_level_time;
  //  g_object_get(G_OBJECT(queue), "current-level-buffers", &current_level_buffers, NULL);
  //  g_object_get(G_OBJECT(queue), "current-level-time", &current_level_time, NULL);
  //  g_print("current-level-buffers: %d, current-level-time: %d\n", current_level_buffers, current_level_time);
  //  g_print("current-level-buffers: %d\n", current_level_buffers);
}

static bool print_queue_status(gpointer user_data) {
  auto custom_data = (CustomData *)user_data;
  int current_level_buffers, current_level_time, current_level_bytes;
  g_object_get(G_OBJECT(custom_data->queue), "current-level-buffers", &current_level_buffers, NULL);
  g_object_get(G_OBJECT(custom_data->queue), "current-level-time", &current_level_time, NULL);
  g_object_get(G_OBJECT(custom_data->queue), "current-level-bytes", &current_level_bytes, NULL);
  g_print("current-level-buffers: %d\n current-level-time: %d\n current-level-bytes: %d\n", current_level_buffers,
          current_level_time, current_level_bytes);
  return 1;
}

static void cb_message(GstBus *bus, GstMessage *msg, CustomData *data) {
  switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_ERROR: {
      GError *err;
      gchar *debug;

      gst_message_parse_error(msg, &err, &debug);
      g_print("Error: %s\n", err->message);
      g_error_free(err);
      g_free(debug);

      gst_element_set_state(data->pipeline, GST_STATE_READY);
      g_main_loop_quit(data->loop);
      break;
    }
    case GST_MESSAGE_EOS:
      /* end-of-stream */
      gst_element_set_state(data->pipeline, GST_STATE_READY);
      g_main_loop_quit(data->loop);
      break;
    case GST_MESSAGE_CLOCK_LOST:
      /* Get a new clock */
      gst_element_set_state(data->pipeline, GST_STATE_PAUSED);
      gst_element_set_state(data->pipeline, GST_STATE_PLAYING);
      break;
    default:
      /* Unhandled message */
      break;
  }
}

// Callback function for the "pad-added" signal
static void on_pad_added(GstElement *element, GstPad *pad, gpointer data) {
  // Check if the pad is an audio or video pad and link it to decodebin accordingly
  auto customData = (CustomData *)data;
  GstCaps *caps = gst_pad_query_caps(pad, NULL);
  const gchar *pad_type = gst_structure_get_name(gst_caps_get_structure(caps, 0));
  g_print("pad type %s\n", pad_type);
  gst_caps_unref(caps);

  if (g_str_has_prefix(pad_type, "video/x-raw")) {
    GstPad *sinkpad = gst_element_get_static_pad(customData->videoconvert, "sink");
    if (gst_pad_link(pad, sinkpad) != GST_PAD_LINK_OK) {
      g_print("cannot link src to converter!\n");
    } else {
      g_print("pad type %s linked succefully\n", pad_type);
    }
    gst_object_unref(sinkpad);
  }
}

static void handoff_callback(GstElement *identity, GstBuffer *buffer, gpointer udata) {
  // introduce downstream latency
  std::this_thread::sleep_for(std::chrono::milliseconds(250));
}

int tutorial_main(int argc, char *argv[]) {
  CustomData customData;
  GstBus *bus;

  /* Initialize GStreamer */
  gst_init(&argc, &argv);

  // Create pipeline and elements
  customData.pipeline = gst_pipeline_new("my-pipeline");
  customData.source = gst_element_factory_make("uridecodebin", "source");
  customData.videoconvert = gst_element_factory_make("videoconvert", "videoconvert");
  customData.queue = gst_element_factory_make("queue", "queue");
  customData.identity = gst_element_factory_make("identity", "identity");
  customData.sink = gst_element_factory_make("autovideosink", "autovideosink");

  // set props
  g_object_set(customData.source, "uri", "file:////home/user/testFolder/test.mkv", NULL);
  g_object_set(customData.queue, "leaky", 2, "max-size-buffers", 50, "max-size-bytes", 0, "max-size-time", 0, NULL);
  g_object_set(customData.sink, "sync", FALSE, NULL);

  // Check if elements were created successfully
  if (!customData.pipeline || !customData.source || !customData.videoconvert || !customData.identity ||
      !customData.queue || !customData.sink) {
    g_print("Elements could not be created. Exiting.\n");
    return -1;
  }

  // Add elements to the pipeline
  gst_bin_add_many(GST_BIN(customData.pipeline), customData.source, customData.videoconvert, customData.identity,
                   customData.queue, customData.sink, NULL);

  // Link the elements
  if (!gst_element_link_many(customData.videoconvert, customData.identity, customData.queue, customData.sink, NULL)) {
    g_print("Elements could not be linked. Exiting.\n");
    return -1;
  }

  // Set up the "pad-added" signal handler for the source element
  g_signal_connect(customData.source, "pad-added", G_CALLBACK(on_pad_added), &customData);

  // connect signal with callback
  g_signal_connect(G_OBJECT(customData.queue), "overrun", G_CALLBACK(on_queue_overrun), NULL);
  g_signal_connect(G_OBJECT(customData.identity), "handoff", G_CALLBACK(handoff_callback), NULL);

  // Start playing
  gst_element_set_state(customData.pipeline, GST_STATE_PLAYING);

  /* Wait until error or EOS */
  bus = gst_element_get_bus(customData.pipeline);

  GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
  customData.loop = main_loop;

  gst_bus_add_watch(GST_ELEMENT_BUS(customData.pipeline), (GstBusFunc)cb_message, &customData);

  g_timeout_add(250, (GSourceFunc)print_queue_status, &customData);

  g_main_loop_run(main_loop);

  /* Free resources */
  g_main_loop_unref(main_loop);
  /* Free resources */
  gst_object_unref(bus);
  gst_element_set_state(customData.pipeline, GST_STATE_NULL);
  gst_object_unref(customData.pipeline);
  return 0;
}

int main(int argc, char *argv[]) {
#if defined(__APPLE__) && TARGET_OS_MAC && !TARGET_OS_IPHONE
  return gst_macos_main(tutorial_main, argc, argv, NULL);
#else
  return tutorial_main(argc, argv);
#endif
}

Also I wanna mention that the production rate is for example 25 fps while the consumption rate, forced by the plugin, at 5 fps