How can I implement Reconnection In the following pipeline

#include <glib.h>
#include <gst/app/gstappsink.h>
#include <gst/gst.h>
#include <gst/gstcapsfeatures.h>
#include <gst/rtsp/gstrtspconnection.h>
#include <gst/video/video.h>
#include <spdlog/spdlog.h>

#include <chrono>
#include <fstream>
#include <iostream>
#include <mutex>
#include <nlohmann/json.hpp>
#include <string>
#include <thread>
#include <zmq.hpp>

#include "ImagePipeline.h"
#include "VideoPipeline.h"
#include "spdlog/async.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "utils.h"
static GMutex mutex;
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
using json = nlohmann::json;
std::shared_ptr<spdlog::logger> rtsp_logger;

// Structure to hold pipeline configuration
struct PipelineConfig {
    std::string                     rtsp_url;
    int                             jpeg_quality         = 85;
    std::string                     zmq_endpoint;
    int                             zmq_hwm       = 10;
    int                             camera_id     = -1;
    zmq::context_t*                 zmq_context   = nullptr;
    zmq::socket_t*                  zmq_publisher = nullptr;
    GMutex                          pipeline_mutex;
    std::shared_ptr<spdlog::logger> logger;
    uint64_t                        frame_id = 0;
};

zmq::context_t* g_zmq_context   = nullptr;
zmq::socket_t*  g_zmq_publisher = nullptr;
PipelineConfig  config;

struct BusCallbackData {
    PipelineConfig config;
    GstElement*    pipeline;
    GMainLoop*     loop;
};

// Forward declarations
static void          on_pad_added_dynamic(GstElement* element, GstPad* pad,
                                          gpointer data);
static GstFlowReturn on_new_sample(GstAppSink* appsink, gpointer user_data);
static gboolean on_bus_message(GstBus* bus, GstMessage* msg, gpointer data);

bool load_config(const std::string& config_file, PipelineConfig& config) {
    try {
        std::ifstream file(config_file);
        if (!file.is_open( )) {
            rtsp_logger->error("[CONFIG] Failed to open config file: {}",
                               config_file);
            return false;
        }

        json j;
        file >> j;
        rtsp_logger->info("[CONFIG] Parsing configuration file: {}",
                          config_file);

        if (j.contains("rtsp")) {
            auto& rtsp = j["rtsp"];
            if (rtsp.contains("url")) config.rtsp_url = rtsp["url"];
            rtsp_logger->info("[RTSP] URL: {}", config.rtsp_url);
            if (rtsp.contains("camera_id"))
                config.camera_id = rtsp["camera_id"];
            rtsp_logger->info("[RTSP] Camera ID: {}", config.camera_id);
        }

        if (j.contains("jpeg") && j["jpeg"].contains("quality")) {
            config.jpeg_quality = j["jpeg"]["quality"];
            rtsp_logger->info("[JPEG] Quality: {}", config.jpeg_quality);
        }

        if (j.contains("zmq")) {
            auto& zmq = j["zmq"];
            if (zmq.contains("endpoint")) config.zmq_endpoint = zmq["endpoint"];
            rtsp_logger->info("[ZMQ] Endpoint: {}", config.zmq_endpoint);

            if (zmq.contains("hwm")) config.zmq_hwm = zmq["hwm"];
            rtsp_logger->info("[ZMQ] HWM: {}", config.zmq_hwm);
        }

        rtsp_logger->info("[CONFIG] Configuration loaded successfully");
        return true;
    } catch (const std::exception& e) {
        rtsp_logger->error("[CONFIG] Error parsing config file: {}", e.what( ));
        return false;
    }
}

bool init_zmq(const std::string& endpoint, int hwm) {
    try {
        g_zmq_context   = new zmq::context_t(1);
        g_zmq_publisher = new zmq::socket_t(*g_zmq_context, ZMQ_PUB);
        g_zmq_publisher->set(zmq::sockopt::sndhwm, hwm);
        g_zmq_publisher->bind(endpoint);

        rtsp_logger->info("[ZMQ] Initialized on {} with HWM: {}", endpoint,
                          hwm);
        return true;
    } catch (const zmq::error_t& e) {
        rtsp_logger->error("[ZMQ] Initialization error: {}", e.what( ));
        return false;
    }
}

void cleanup_zmq( ) {
    if (g_zmq_publisher) {
        g_zmq_publisher->close( );
        delete g_zmq_publisher;
        g_zmq_publisher = nullptr;
        rtsp_logger->info("[ZMQ] Publisher socket closed and deleted");
    }

    if (g_zmq_context) {
        g_zmq_context->close( );
        delete g_zmq_context;
        g_zmq_context = nullptr;
        rtsp_logger->info("[ZMQ] Context closed and deleted");
    }
}

bool create_rtsp_pipeline_dynamic(const PipelineConfig& config,
                                  GMainLoop*            loop) {
    // Initialize GStreamer
    gst_init(nullptr, nullptr);

    config.logger->info("[RTSP Pipeline] Initializing GStreamer");

    GstElement* pipeline = gst_pipeline_new("rtsp-pipeline");
    GstElement* source   = gst_element_factory_make("uridecodebin", "source");
    GstElement* queue    = gst_element_factory_make("queue", "queue");
    GstElement* vidconv = gst_element_factory_make("nvvideoconvert", "convert");
    if (!vidconv) {
        vidconv = gst_element_factory_make("videoconvert", "convert");
        config.logger->info("[RTSP Pipeline] Falling back to videoconvert");
    }

    GstElement* jpegenc = gst_element_factory_make("nvjpegenc", "jpegenc");
    if (!jpegenc) {
        jpegenc = gst_element_factory_make("jpegenc", "jpegenc");
        config.logger->info("[RTSP Pipeline] Falling back to jpegenc");
    }

    GstElement* appsink = gst_element_factory_make("appsink", "sink");

    if (!pipeline || !source || !queue || !vidconv || !jpegenc || !appsink) {
        config.logger->error(
            "[RTSP Pipeline] Failed to create one or more GStreamer elements.");
        return false;
    }

    config.logger->info("[RTSP Pipeline] Configuring RTSP source: {}",
                        config.rtsp_url);
    g_object_set(G_OBJECT(source), "uri", config.rtsp_url.c_str( ), NULL);

    config.logger->info("[RTSP Pipeline] Configuring AppSink");
    g_object_set(G_OBJECT(appsink), "emit-signals", TRUE, NULL);
    g_object_set(G_OBJECT(appsink), "max-buffers", 1, NULL);
    g_object_set(G_OBJECT(appsink), "drop", TRUE, NULL);
    g_object_set(G_OBJECT(appsink), "sync", TRUE, NULL);

    PipelineConfig*     config_copy = new PipelineConfig(config);
    GstAppSinkCallbacks callbacks   = {nullptr};
    callbacks.new_sample            = on_new_sample;
    callbacks.new_preroll           = nullptr;
    callbacks.new_event             = nullptr;
    config.logger->info("[RTSP Pipeline] Setting AppSink callbacks");
    gst_app_sink_set_callbacks(
        GST_APP_SINK(appsink), &callbacks, config_copy,
        [](gpointer data) { delete static_cast<PipelineConfig*>(data); });

    config.logger->info("[RTSP Pipeline] Adding elements to pipeline");
    gst_bin_add_many(GST_BIN(pipeline), source, queue, vidconv, jpegenc,
                     appsink, NULL);

    config.logger->info("[RTSP Pipeline] Linking elements in the pipeline");
    if (!gst_element_link_many(queue, vidconv, jpegenc, appsink, NULL)) {
        config.logger->error("Elements could not be linked.\n");
        gst_object_unref(pipeline);
        return false;
    }

    config.logger->info(
        "[RTSP Pipeline] Connecting pad-added signal for RTSP source");
    g_signal_connect(source, "pad-added", G_CALLBACK(on_pad_added_dynamic),
                     queue);

    config.logger->info("[RTSP Pipeline] Setting pipeline to PLAYING state");
    GstStateChangeReturn ret =
        gst_element_set_state(pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE) {
        config.logger->error("[RTSP Pipeline] Failed to start pipeline.");
        gst_object_unref(pipeline);
        return false;
    }

    config.logger->info("[RTSP Pipeline] Setting up bus watch");
    BusCallbackData* callback_data =
        new BusCallbackData{config, pipeline, loop};

    GstBus* bus = gst_element_get_bus(pipeline);
    gst_bus_add_watch(bus, on_bus_message, callback_data);
    gst_object_unref(bus);

    config.logger->info("[RTSP Pipeline] Pipeline started successfully.");
    return true;
}

static void on_pad_added_dynamic(GstElement* src, GstPad* pad, gpointer data) {
    GstElement* queue    = ( GstElement* ) data;
    GstPad*     sink_pad = gst_element_get_static_pad(queue, "sink");

    if (gst_pad_is_linked(sink_pad)) {
        gst_object_unref(sink_pad);
        return;
    }
    // Check the new pad's type
    GstCaps*         new_pad_caps   = gst_pad_get_current_caps(pad);
    GstStructure*    new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
    const gchar*     new_pad_type   = gst_structure_get_name(new_pad_struct);
    GstCapsFeatures* features       = gst_caps_get_features(new_pad_caps, 0);

    if (!strncmp(new_pad_type, "video", 5)) {
        if (gst_caps_features_contains(features, GST_CAPS_FEATURES_NVMM)) {
            rtsp_logger->info("Found Nvidia Decoder");
        }
    }

    if (g_str_has_prefix(new_pad_type, "video/")) {
        GstPadLinkReturn ret = gst_pad_link(pad, sink_pad);
        if (GST_PAD_LINK_FAILED(ret)) {
            rtsp_logger->error("Link failed.\n");
        } else {
            rtsp_logger->info("Link succeeded (type: %s).\n", new_pad_type);
        }
    } else {
        rtsp_logger->error("It has type '%s' which is not video. Ignoring.\n",
                           new_pad_type);
    }

    if (new_pad_caps != NULL) {
        gst_caps_unref(new_pad_caps);
    }
    gst_object_unref(sink_pad);
}

static GstFlowReturn on_new_sample(GstAppSink* appsink, gpointer user_data) {
    g_mutex_lock(&mutex);

    PipelineConfig* config = static_cast<PipelineConfig*>(user_data);
    config->logger->info("[RTSP Pipeline] Config cam id: {}",
                         config->camera_id);

    GstSample* sample = gst_app_sink_pull_sample(appsink);
    if (!sample) {
        config->logger->warn(
            "[RTSP Pipeline] Failed to pull sample from appsink - skipping "
            "frame");
        return GST_FLOW_OK;
    }

    GstBuffer* buffer = gst_sample_get_buffer(sample);
    if (!buffer) {
        config->logger->warn(
            "[RTSP Pipeline] Failed to get buffer from sample - skipping "
            "frame");
        gst_sample_unref(sample);
        return GST_FLOW_OK;
    }

    // Current timestamp
    auto now       = std::chrono::system_clock::now( );
    auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
                         now.time_since_epoch( ))
                         .count( );

    GstMapInfo map;
    if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) {
        config->logger->warn(
            "[RTSP Pipeline] Failed to map GstBuffer - skipping frame");
        gst_sample_unref(sample);
        return GST_FLOW_OK;
    }

    if (!map.data || map.size == 0) {
        config->logger->warn(
            "[RTSP Pipeline] Invalid buffer data - skipping frame");
        gst_buffer_unmap(buffer, &map);
        gst_sample_unref(sample);
        return GST_FLOW_OK;
    }

    if (config->zmq_publisher) {
        try {
            // Prepare string components
            std::string camera_id =
                "irm/cam" + std::to_string(config->camera_id) + "/stream";
            std::string frame_id = "camera_" +
                                   std::to_string(config->camera_id) + "_" +
                                   std::to_string(config->frame_id);
            std::string ts_str   = std::to_string(timestamp);
            std::string size_str = std::to_string(map.size);

            // Part 1: Send Camera ID
            zmq::message_t id_msg(camera_id.size( ));
            memcpy(id_msg.data( ), camera_id.data( ), camera_id.size( ));
            config->zmq_publisher->send(id_msg, zmq::send_flags::sndmore);
            config->logger->info("[RTSP Pipeline] 1 Sent Camera ID: {}",
                                 camera_id);

            // Part 2: Send Frame ID
            zmq::message_t frame_id_msg(frame_id.size( ));
            memcpy(frame_id_msg.data( ), frame_id.data( ), frame_id.size( ));
            config->zmq_publisher->send(frame_id_msg, zmq::send_flags::sndmore);
            config->logger->info("[RTSP Pipeline] 2 Sent frame_id: {}",
                                 frame_id);
            config->frame_id++;

            // Part 3: Send Timestamp
            zmq::message_t timestamp_msg(ts_str.size( ));
            memcpy(timestamp_msg.data( ), ts_str.data( ), ts_str.size( ));
            config->zmq_publisher->send(timestamp_msg,
                                        zmq::send_flags::sndmore);
            config->logger->info("[RTSP Pipeline] 3 Sent Timestamp: {}",
                                 ts_str);

            // Part 4: Send Image Size (as string)
            zmq::message_t image_size_msg(size_str.size( ));
            memcpy(image_size_msg.data( ), size_str.data( ), size_str.size( ));
            config->zmq_publisher->send(image_size_msg,
                                        zmq::send_flags::sndmore);
            config->logger->info("[RTSP Pipeline] 4 Sent Image Size: {} bytes",
                                 map.size);

            // Part 5: Send Image Data (binary)
            zmq::message_t image_msg(map.size);
            memcpy(image_msg.data( ), map.data, map.size);
            config->zmq_publisher->send(image_msg, zmq::send_flags::none);
            config->logger->info(
                "[RTSP Pipeline] 5 Sent Image of Size: {} bytes", map.size);

            config->logger->info("[RTSP Pipeline] Sent to {}",
                                 config->zmq_endpoint);

        } catch (const zmq::error_t& e) {
            config->logger->warn(
                "[RTSP Pipeline] ZMQ send error: {} - skipping frame",
                e.what( ));
        } catch (const std::exception& e) {
            config->logger->warn(
                "[RTSP Pipeline] Unexpected error: {} - skipping frame",
                e.what( ));
        } catch (...) {
            config->logger->warn(
                "[RTSP Pipeline] Unknown error during ZMQ send - skipping "
                "frame");
        }
    }

    gst_buffer_unmap(buffer, &map);
    gst_sample_unref(sample);
    g_mutex_unlock(&mutex);

    return GST_FLOW_OK;
}

static gboolean on_bus_message(GstBus* bus, GstMessage* msg, gpointer data) {
    BusCallbackData* callback_data = static_cast<BusCallbackData*>(data);

    PipelineConfig config   = callback_data->config;
    GstElement*    pipeline = callback_data->pipeline;
    GMainLoop*     loop     = callback_data->loop;

    switch (GST_MESSAGE_TYPE(msg)) {
        case GST_MESSAGE_STATE_CHANGED: {
            GstState old_state, new_state, pending;
            gst_message_parse_state_changed(msg, &old_state, &new_state,
                                            &pending);
            if (true) {
                config.logger->info(
                    "[RTSP Pipeline] Element {} state change: {} -> {} "
                    "(pending: {})",
                    GST_OBJECT_NAME(msg->src),
                    gst_element_state_get_name(old_state),
                    gst_element_state_get_name(new_state),
                    gst_element_state_get_name(pending));
            }
            break;
        }
        case GST_MESSAGE_ERROR: {
            GError* err        = nullptr;
            gchar*  debug_info = nullptr;
            gst_message_parse_error(msg, &err, &debug_info);
            config.logger->error(
                "[RTSP Pipeline] Error Occurred!!, attempting to restart "
                "pipeline...");
            config.logger->error("[RTSP Pipeline] Error from element {}: {}",
                                 GST_OBJECT_NAME(msg->src), err->message);
            config.logger->error("[RTSP Pipeline] Debugging information: {}",
                                 debug_info ? debug_info : "none");

            g_error_free(err);
            g_free(debug_info);

            // Instead of just changing state, we'll completely recreate the
            // pipeline
            config.logger->info("[RTSP Pipeline] Destroying old pipeline...");
            gst_element_set_state(pipeline, GST_STATE_NULL);

            // Give a short delay to ensure cleanup
            g_usleep(500000);  // 500ms

            GstBus* bus = gst_element_get_bus(pipeline);
            gst_bus_remove_watch(bus);
            gst_object_unref(bus);

            // Unref old pipeline
            gst_object_unref(pipeline);

            config.logger->info("[RTSP Pipeline] Creating new pipeline...");

            // Create a new pipeline
            if (!create_rtsp_pipeline_dynamic(config, loop)) {
                config.logger->error(
                    "[RTSP Pipeline] Failed to recreate pipeline, exiting.");
                g_main_loop_quit(loop);
            } else {
                config.logger->info(
                    "[RTSP Pipeline] Pipeline successfully recreated and "
                    "started.");
            }

            delete callback_data;

            // Update the callback data with the new pipeline
            callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");

            // Since we've destroyed and recreated the pipeline, we no longer
            // need this bus watch
            return FALSE;
            break;
        }

        case GST_MESSAGE_EOS: {
            config.logger->info(
                "[RTSP Pipeline] End of stream received, restarting pipeline");

            // Clean up properly
            config.logger->info("[RTSP Pipeline] Destroying old pipeline...");
            gst_element_set_state(pipeline, GST_STATE_NULL);

            g_usleep(500000);

            GstBus* pipeline_bus = gst_element_get_bus(pipeline);
            gst_bus_remove_watch(pipeline_bus);
            gst_object_unref(pipeline_bus);

            gst_object_unref(pipeline);

            config.logger->info("[RTSP Pipeline] Creating new pipeline...");
            if (!create_rtsp_pipeline_dynamic(config, loop)) {
                config.logger->error(
                    "[RTSP Pipeline] Failed to recreate pipeline, exiting.");
                g_main_loop_quit(loop);
            } else {
                config.logger->info(
                    "[RTSP Pipeline] Pipeline successfully recreated and "
                    "started.");
            }

            delete callback_data;

            // Update the callback data with the new pipeline
            callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");

            return FALSE;
        }

        default:
            break;
    }
    return TRUE;
}

int main(int argc, char* argv[]) {
    auto max_size  = 1048576 * 100;
    auto max_files = 100;
    rtsp_logger    = spdlog::rotating_logger_mt(
           "ImgRcvM-RTSP", "logs/rtsp_pipeline_logs.txt", max_size, max_files);
    rtsp_logger->flush_on(spdlog::level::info);
    rtsp_logger->info("[Initiating] Starting Multi-Camera Receiver Module...");

    // Printing Version on cli
    for (int i = 1; i < argc; ++i) {
        if (std::string(argv[i]) == "--version") {
            print_version( );
            return 0;
        }
    }

    if (argc < 2) {
        rtsp_logger->error("[Initiating] Usage: {} <config.json>", argv[0]);
        return 1;
    }

    std::string config_file = argv[1];

    gst_init(nullptr, nullptr);
    g_mutex_init(&mutex);

    std::ifstream file(config_file);
    if (!file.is_open( )) {
        rtsp_logger->error("[Initiating] Failed to open config file: {}",
                           config_file);
        return 1;
    }

    json config_json;
    try {
        file >> config_json;
    } catch (const std::exception& e) {
        rtsp_logger->error("[Initiating] Error parsing JSON config: {}",
                           e.what( ));
        return 1;
    }

    g_zmq_context = new zmq::context_t(1);
    if (!g_zmq_context) {
        rtsp_logger->error("[Initiating] Failed to initialize ZeroMQ context");
        return 1;
    }

    g_zmq_publisher = new zmq::socket_t(*g_zmq_context, ZMQ_PUB);
    std::string global_endpoint;
    int         global_hwm;

    if (config_json.contains("zmq")) {
        auto& zmq_json = config_json["zmq"];
        if (zmq_json.contains("endpoint"))
            global_endpoint = zmq_json["endpoint"];
        if (zmq_json.contains("hwm")) global_hwm = zmq_json["hwm"];
    }

    g_zmq_publisher->set(zmq::sockopt::sndhwm, global_hwm);
    g_zmq_publisher->set(zmq::sockopt::sndbuf, 10 * 1024 * 1024);
    g_zmq_publisher->bind(global_endpoint);
    rtsp_logger->info("[Initiating] Bound global ZMQ Publisher to {}",
                      global_endpoint);

    std::vector<GMainLoop*>     loops;
    std::vector<zmq::socket_t*> publishers;
    std::vector<GstElement*>    pipelines;
    std::vector<PipelineConfig> camera_configs;

    // Video
    rtsp_logger->info("[Initiating] Processing {} videos",
                      config_json["videos"].size( ));
    std::vector<std::thread> video_threads;
    for (size_t i = 0; i < config_json["videos"].size( ); i++) {
        auto& video_json = config_json["videos"][i];
        if (video_json.contains("location")) {
            rtsp_logger->info("[Initiating] Video Source: {}",
                              video_json["location"]);
            video_threads.emplace_back(VideoPipeline, video_json, i);
        }
    }

    // Images
    rtsp_logger->info("[Initiating] Processing {} image sources",
                      config_json["images"].size( ));
    std::vector<std::thread> image_threads;
    for (size_t i = 0; i < config_json["images"].size( ); i++) {
        auto& image_json = config_json["images"][i];
        if (image_json.contains("location")) {
            rtsp_logger->info("[Initiating] Image Source: {}",
                              image_json["location"]);
            image_threads.emplace_back(ImagePipeline, image_json, i);
        }
    }

    // RTSP cameras
    rtsp_logger->info("[Initiating] Processing {} cameras",
                      config_json["cameras"].size( ));
    for (size_t i = 0; i < config_json["cameras"].size( ); i++) {
        PipelineConfig camera_config;
        auto&          camera_json = config_json["cameras"][i];
        g_mutex_init(&(camera_config.pipeline_mutex));

        if (camera_json.contains("rtsp")) {
            auto& rtsp = camera_json["rtsp"];
            if (rtsp.contains("url")) camera_config.rtsp_url = rtsp["url"];
            if (rtsp.contains("camera_id"))
                camera_config.camera_id = rtsp["camera_id"];
        }

        if (camera_json.contains("jpeg") &&
            camera_json["jpeg"].contains("quality")) {
            camera_config.jpeg_quality = camera_json["jpeg"]["quality"];
        } else if (config_json.contains("jpeg") &&
                   config_json["jpeg"].contains("quality")) {
            camera_config.jpeg_quality = config_json["jpeg"]["quality"];
        }

        camera_config.zmq_publisher = g_zmq_publisher;
        camera_config.zmq_endpoint  = global_endpoint;

        rtsp_logger->info("[Initiating] Camera {} RTSP URL: {} Camera ID: {}",
                          i, camera_config.rtsp_url, camera_config.camera_id);
        rtsp_logger->info("[Initiating] Camera {} ZMQ endpoint: {}", i,
                          camera_config.zmq_endpoint);
        std::string logger_name =
            "ImgRcvM-RTSP_" + std::to_string(camera_config.camera_id);
        std::string log_file_name = "logs/rtsp_pipeline_camera_id_" +
                                    std::to_string(camera_config.camera_id) +
                                    "_.txt";
        camera_config.logger =
            spdlog::rotating_logger_mt<spdlog::async_factory>(
                logger_name, log_file_name, max_size, max_files);
        camera_config.logger->flush_on(spdlog::level::info);

        GMainLoop* loop = g_main_loop_new(NULL, FALSE);
        loops.push_back(loop);

        if (!create_rtsp_pipeline_dynamic(camera_config, loop)) {
            rtsp_logger->error(
                "[RTSP Pipeline] Failed to create pipeline for camera {}", i);
            continue;
        }

        pipelines.push_back(
            gst_pipeline_new(("pipeline-" + std::to_string(i)).c_str( )));

        camera_configs.push_back(camera_config);
    }

    if (!loops.empty( )) {
        g_main_loop_run(loops[0]);
    }

    for (auto loop : loops) {
        g_main_loop_unref(loop);
    }

    for (auto publisher : publishers) {
        publisher->close( );
        delete publisher;
    }

    if (g_zmq_publisher) {
        g_zmq_publisher->close( );
        delete g_zmq_publisher;
        g_zmq_publisher = nullptr;
    }

    for (auto& thread : video_threads) {
        if (thread.joinable( )) {
            thread.join( );
        }
    }

    for (auto& thread : image_threads) {
        if (thread.joinable( )) {
            thread.join( );
        }
    }

    return 0;
}
I have exhausted all of my options. Hence putting it here. 

What I am trying to do is, if any camera goes down, the entire pipeline should not stop. Other cameras should keep playing and the one that got disconnected, pipeline should keep trying it.

Hi,

A couple of observations:

  1. I think you don’t need to do gst_init every time in the create_rtsp_pipeline_dynamic. It is initialized in the main once, and that should be sufficient.

  2. In the case of error and EOS, you are clearing the callback_data but trying to access it again without allocation. Is it not supposed to crash because of invalid memory access, or am I missing something? Also, you create a new pipeline in create_rtsp_pipeline_dynamic so what is the point of creating another pipeline in the case of EOS and Error?

            delete callback_data;

            // Update the callback data with the new pipeline
            callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");