Multithreaded application with timed recording pipeline

I am trying to build a pipeline that can record for a specific duration (like 5 seconds) and then stop the processes and clean up elegantly. To do that I’m setting a timer in a separate thread, which sends an EOS to the pipeline when it ends:

void AutoRecorderSink::stop() {
  std::cout << "Stop called, checking running variable" << std::endl;
  if (running) {
    std::cout << "Stopping the pipeline" << std::endl;
    GstState current, pending;
    gst_element_get_state(pipeline, &current, &pending, GST_CLOCK_TIME_NONE);
    if (current != GST_STATE_PLAYING) {
      std::cerr << "Pipeline is not in PLAYING state, current state: "
                << gst_element_state_get_name(current) << std::endl;
    }
    gst_element_send_event(pipeline, gst_event_new_eos());
    std::cout << "EOS sent" << std::endl;
    // Wait for the EOS message to be received
    {
      std::unique_lock<std::mutex> lock(stop_mutex);
      stop_cv.wait(lock, [this] { return eos_received; });
      std::cout << "eos received" << std::endl;
    }
    // we can quit the main loop
    g_main_loop_quit(loop);
    g_main_loop_unref(loop);
    running = false;
  }
}

The problem I have is that I never seems to get an EOS message on the bus :

gboolean AutoRecorderSink::message_cb(GstBus *bus, GstMessage *message,
                                      gpointer user_data) {
  AutoRecorderSink *self = static_cast<AutoRecorderSink *>(user_data);
  switch (GST_MESSAGE_TYPE(message)) {
  case GST_MESSAGE_ERROR:
    GError *err;
    gchar *debug_info;
    gst_message_parse_error(message, &err, &debug_info);
    std::cerr << "Error received from element " << GST_OBJECT_NAME(message->src)
              << ": " << err->message << std::endl;
    std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
              << std::endl;
    g_clear_error(&err);
    g_free(debug_info);
    self->stop();
    break;
  case GST_MESSAGE_EOS:
    std::cout << "EOS Received - Stopping Recording.\n";
    {
      std::lock_guard<std::mutex> lock(self->stop_mutex);
      self->eos_received = true;
    }
    self->stop_cv.notify_one(); // continue with the stop() function
    break;
  default:
    break;
  }
  return TRUE;
}

Specifically I never get the message “EOS Received - Stopping Recording”.

Solutions I tried:
I tried setting the message-forward property on the pipeline so that the event is propagated to every element but that didn’t help.

I also tried to run the stop function within a g_idle_add function so that it runs in the main thread but that didn’t help. Here’s the code for the g_idle_add :

gboolean AutoRecorderSink::stop_pipeline_idle(gpointer user_data) {
    AutoRecorderSink *recorder = static_cast<AutoRecorderSink *>(user_data);
    recorder->perform_stop();
    return G_SOURCE_REMOVE; // Remove the idle function after execution
}

void AutoRecorderSink::stop() {
    std::cout << "Stop called, checking running variable" << std::endl;
    if (running) {
        // Queue the stop logic to run in the main loop thread
        g_idle_add(stop_pipeline_idle, this);
    }
}

void AutoRecorderSink::perform_stop() {
    std::cout << "Stopping the pipeline" << std::endl;

    GstState current, pending;
    gst_element_get_state(pipeline, &current, &pending, GST_CLOCK_TIME_NONE);
    if (current != GST_STATE_PLAYING && current != GST_STATE_PAUSED) {
        std::cerr << "Pipeline is not in PLAYING or PAUSED state, current state: "
                  << gst_element_state_get_name(current) << std::endl;
    }

    // Send EOS to the pipeline
    if (!gst_element_send_event(pipeline, gst_event_new_eos())) {
        std::cerr << "Failed to send EOS event" << std::endl;
        return;
    }
    std::cout << "EOS sent" << std::endl;

    // Wait for the EOS message to be received on the bus
    {
        std::unique_lock<std::mutex> lock(stop_mutex);
        stop_cv.wait(lock, [this] { return eos_received; });
        std::cout << "EOS received" << std::endl;
    }

    // Clean up GMainLoop
    if (loop) {
        g_main_loop_quit(loop);
        g_main_loop_unref(loop);
        loop = nullptr;
    }
    running = false;
}

Note: It seems that the EOS is somehow registered as the recording I get seems to be of the duration I sent as parameter. However it never get to the bus for some reason.

Not sure what exactly goes wrong in your case, but I have a little example here that shows how to start/stop a recording branch with a timer (with a parallel display branch that continues), in case you find it useful: H.264 backlog recording example ($1760) · Snippets · freedesktop.org / Snippets · GitLab

1 Like

thank you for this answer, I see you are using probes but I don’t understand why, I never used probes before, is it necessary for this kind of use case ? I feel like they add a lot of complexity, especially with manual buffer initialization…

If possible I would like to really focus on just being able to read the message in my cb function, with the bus of the pipeline, I feel like this would be much more simple to deal with my pipelines. I just don’t seem to be able to get any kind of message on my bus…

The probes in the example are used to make the “backlog” recording part work:

  1. to block the output pad of the queue (so backlog accumulates in the queue and doesn’t flow out until unblocked)
  2. to filter out non-keyframes when the backlog queue pad is unlocked, so the recording starts cleanly with a keyframe.

If you don’t need that functionality you probably don’t need pad probes here.

(This all from memory.)

I took reference from your code for the bus watch and used guint bus_watch_id = gst_bus_add_watch(bus, (GstBusFunc)message_cb, this); and it fixed my problem thank you !