Dynamically restarting a live source

Hi.
I have an application where I have a live camera source, which then downstream the images are being processed and insights saved to a database. Sometimes the camera connection is unstable, and this can cause the entire application to crash.

I am trying to build my application such that in the event of a camera crash, the pipeline will continue to run (such that the downstream elements are still working), and have a function which will restart the camera, and start streaming again (by destroying the camera element, and creating a new one in its place)

I am having very mixed results on how to do this, it seems to work OK if i set the pipeline to NULL before destroying/unlinking, but I would rather leave the pipeline running or paused to not lose the data downstream and have to restart those elements (they take a while to spin up)

I have also played around with blocking the pad of the source, then sending an EOS to catch, but the documentation and examples here are not too clear on what this achieves and why, therefore I am unsure where exactly I should be using pad blockers and EOS (i.e. do I block the dead source element?, the first downstream element? Does the EOS keep propagating and kill all my other elements?).

Does someone have some guidance or steps I should follow to achieve the restart of a live source? Any guidance would be thoroughly appreciated.

For reference, here is some test code I was trying to run (not working, but will work if I set pipeline to NULL first.

// Simple script which will start an aravis pipeline, then restart ONLY the camera
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

#include <gst/gst.h>


int main(int argc, char *argv[])
{
    gst_init (&argc, &argv);

    GstElement *pipeline = gst_pipeline_new ("aravis_pipeline");
    GstElement *source = gst_element_factory_make ("aravissrc", "source");
    if (!source) {
        std::cerr << "Failed to create the source element." << std::endl;
        return false;
    }

    #define cfg_OffsetY ((1200-1184)/2)
    const char *cam1 = "10.42.0.154";

    g_object_set(G_OBJECT(source),
            "camera-name", cam1,
            "num-arv-buffers", 64,
            "packet-resend", false,
            "packet-size", 9000,
            "auto-packet-size", false,
            NULL);
    
    std::this_thread::sleep_for(std::chrono::seconds(5));


    GstElement *sink = gst_element_factory_make ("fakesink", "fake_sink");

    if (!pipeline || !source || !sink) {
        g_printerr ("Not all elements could be created.\n");
        return -1;
    }

    gst_bin_add_many (GST_BIN (pipeline), source, sink, NULL);

    if (!gst_element_link (source, sink)) {
        g_printerr ("Elements could not be linked.\n");
        gst_object_unref (pipeline);
        return -1;
    }

    GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
    // CustomData data = {pipeline, main_loop};
    std::cout << "Starting the pipeline." << std::endl;
    gst_element_set_state(pipeline, GST_STATE_PLAYING);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    std::cout << "Restarting the camera source." << std::endl;
    // gst_element_set_state(pipeline, GST_STATE_PAUSED);
    // Restart the camera source

    // Stop camera
    gst_element_set_state(source, GST_STATE_NULL);

    // Wait for data to stop flowing
    std::this_thread::sleep_for(std::chrono::seconds(10));

    std::cout << "Camera source stopped." << std::endl;

    // Remove the source from the pipeline
    gst_element_unlink(source, sink);

    gst_bin_remove(GST_BIN(pipeline), source);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    std::cout << "Create a new source." << std::endl;

    // Create a new source
    GstElement *new_source = gst_element_factory_make ("aravissrc", "new_source");
    if (!new_source) {
        std::cerr << "Failed to create the source element." << std::endl;
        return false;
    }

    g_object_set(G_OBJECT(new_source),
        "camera-name", cam1,
        "num-arv-buffers", 64,
        "packet-resend", false,
        "packet-size", 9000,
        "auto-packet-size", false,
        NULL);
    
    std::this_thread::sleep_for(std::chrono::seconds(5));
    std::cout << "Add to pipeline." << std::endl;


    gst_bin_add(GST_BIN(pipeline), new_source);

    std::this_thread::sleep_for(std::chrono::seconds(5));
    std::cout << "Link to pipeline." << std::endl; 
    gst_element_link(new_source, sink);

    std::this_thread::sleep_for(std::chrono::seconds(5));
    std::cout << "sync to pipeline." << std::endl; 

    gst_element_sync_state_with_parent(new_source);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    std::cout << "Starting the pipeline." << std::endl;

    gst_element_set_state(new_source, GST_STATE_PLAYING);


    g_main_loop_run (main_loop);

    /* Free resources */
    g_main_loop_unref (main_loop);
    // gst_object_unref (bus);
    gst_element_set_state (pipeline, GST_STATE_NULL);
    gst_object_unref (pipeline);
}

I don’t know this particular source element, but with a normal live capture source it should be perfectly fine to just restart the source element itself when it throws an error (I assume that’s what you mean by “it crashes the pipeline”?), since it will always timestamp that incoming data with the pipeline clock’s current running time, so it’s fine to just keep the rest of the pipeline running.

You could also have a look at the fallbacksrc element from the Rust plugins set which can automatically switch over to a fallback stream whilst trying to restart the main source element (and switching back to the main stream once it comes back).

Thank you for the response. I guess my main follow up then is related to if there’s a “correct” way to delete and create a new live source. Is it normally fine the way I have it in my app ? Just leave the pipeline running and set source to null? I.e. that would mean there is possibly a bug in the plugin which prevents makes it not get deleted correctly it the pipeline does not get set to null.

Thanks

As a side question, is there any documentation where I can understand better the difference between setting a pipeline to NULL vs. setting an element to NULL?

Cheers

OK, I made a lot of progress on this. The problem is that when I just call gst_element_set_state(source, GST_STATE_NULL);, not all references to the element are being removed, hence the element’s finalize method is never called, and my source is not destroyed. I have checked the plugin methods and they are not creating hanging references.

So far it seems like when I call:

gst_element_set_state(source, GST_STATE_NULL);

This will create 4 extra references!?

Can someone please help me with this? What might be creating the references? Some kind of bus call I need to free?

Thanks again

Setting an element’s state to GST_STATE_NULL only changes the operational GStreamer state, it doesn’t destroy it or free it or anything.

It’s still inside the pipeline, and the pipeline will still hold a ref to it (as it’s a child element which it manages).

If you remove it from the pipeline that ref will go away.

The 4 extra refs probably come from bus messages that get posted as part of the change state to NULL. Each message will contain a reference to the element that triggered the message, and those references will go away once your main application thread’s pipeline bus message handler has seen and processed these messages.

As for restarting the source, you shouldn’t have to destroy it and re-create it. It should be enough to set it to GST_STATE_NULL and then back to GST_STATE_PLAYING to e.g. make it reconnect to some server or close/open some device etc. (assuming the element is correctly implemented).