#include <glib.h>
#include <gst/app/gstappsink.h>
#include <gst/gst.h>
#include <gst/gstcapsfeatures.h>
#include <gst/rtsp/gstrtspconnection.h>
#include <gst/video/video.h>
#include <spdlog/spdlog.h>
#include <chrono>
#include <fstream>
#include <iostream>
#include <mutex>
#include <nlohmann/json.hpp>
#include <string>
#include <thread>
#include <zmq.hpp>
#include "ImagePipeline.h"
#include "VideoPipeline.h"
#include "spdlog/async.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "utils.h"
static GMutex mutex;
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
using json = nlohmann::json;
std::shared_ptr<spdlog::logger> rtsp_logger;
// Structure to hold pipeline configuration
struct PipelineConfig {
std::string rtsp_url;
int jpeg_quality = 85;
std::string zmq_endpoint;
int zmq_hwm = 10;
int camera_id = -1;
zmq::context_t* zmq_context = nullptr;
zmq::socket_t* zmq_publisher = nullptr;
GMutex pipeline_mutex;
std::shared_ptr<spdlog::logger> logger;
uint64_t frame_id = 0;
};
zmq::context_t* g_zmq_context = nullptr;
zmq::socket_t* g_zmq_publisher = nullptr;
PipelineConfig config;
struct BusCallbackData {
PipelineConfig config;
GstElement* pipeline;
GMainLoop* loop;
};
// Forward declarations
static void on_pad_added_dynamic(GstElement* element, GstPad* pad,
gpointer data);
static GstFlowReturn on_new_sample(GstAppSink* appsink, gpointer user_data);
static gboolean on_bus_message(GstBus* bus, GstMessage* msg, gpointer data);
bool load_config(const std::string& config_file, PipelineConfig& config) {
try {
std::ifstream file(config_file);
if (!file.is_open( )) {
rtsp_logger->error("[CONFIG] Failed to open config file: {}",
config_file);
return false;
}
json j;
file >> j;
rtsp_logger->info("[CONFIG] Parsing configuration file: {}",
config_file);
if (j.contains("rtsp")) {
auto& rtsp = j["rtsp"];
if (rtsp.contains("url")) config.rtsp_url = rtsp["url"];
rtsp_logger->info("[RTSP] URL: {}", config.rtsp_url);
if (rtsp.contains("camera_id"))
config.camera_id = rtsp["camera_id"];
rtsp_logger->info("[RTSP] Camera ID: {}", config.camera_id);
}
if (j.contains("jpeg") && j["jpeg"].contains("quality")) {
config.jpeg_quality = j["jpeg"]["quality"];
rtsp_logger->info("[JPEG] Quality: {}", config.jpeg_quality);
}
if (j.contains("zmq")) {
auto& zmq = j["zmq"];
if (zmq.contains("endpoint")) config.zmq_endpoint = zmq["endpoint"];
rtsp_logger->info("[ZMQ] Endpoint: {}", config.zmq_endpoint);
if (zmq.contains("hwm")) config.zmq_hwm = zmq["hwm"];
rtsp_logger->info("[ZMQ] HWM: {}", config.zmq_hwm);
}
rtsp_logger->info("[CONFIG] Configuration loaded successfully");
return true;
} catch (const std::exception& e) {
rtsp_logger->error("[CONFIG] Error parsing config file: {}", e.what( ));
return false;
}
}
bool init_zmq(const std::string& endpoint, int hwm) {
try {
g_zmq_context = new zmq::context_t(1);
g_zmq_publisher = new zmq::socket_t(*g_zmq_context, ZMQ_PUB);
g_zmq_publisher->set(zmq::sockopt::sndhwm, hwm);
g_zmq_publisher->bind(endpoint);
rtsp_logger->info("[ZMQ] Initialized on {} with HWM: {}", endpoint,
hwm);
return true;
} catch (const zmq::error_t& e) {
rtsp_logger->error("[ZMQ] Initialization error: {}", e.what( ));
return false;
}
}
void cleanup_zmq( ) {
if (g_zmq_publisher) {
g_zmq_publisher->close( );
delete g_zmq_publisher;
g_zmq_publisher = nullptr;
rtsp_logger->info("[ZMQ] Publisher socket closed and deleted");
}
if (g_zmq_context) {
g_zmq_context->close( );
delete g_zmq_context;
g_zmq_context = nullptr;
rtsp_logger->info("[ZMQ] Context closed and deleted");
}
}
bool create_rtsp_pipeline_dynamic(const PipelineConfig& config,
GMainLoop* loop) {
// Initialize GStreamer
gst_init(nullptr, nullptr);
config.logger->info("[RTSP Pipeline] Initializing GStreamer");
GstElement* pipeline = gst_pipeline_new("rtsp-pipeline");
GstElement* source = gst_element_factory_make("uridecodebin", "source");
GstElement* queue = gst_element_factory_make("queue", "queue");
GstElement* vidconv = gst_element_factory_make("nvvideoconvert", "convert");
if (!vidconv) {
vidconv = gst_element_factory_make("videoconvert", "convert");
config.logger->info("[RTSP Pipeline] Falling back to videoconvert");
}
GstElement* jpegenc = gst_element_factory_make("nvjpegenc", "jpegenc");
if (!jpegenc) {
jpegenc = gst_element_factory_make("jpegenc", "jpegenc");
config.logger->info("[RTSP Pipeline] Falling back to jpegenc");
}
GstElement* appsink = gst_element_factory_make("appsink", "sink");
if (!pipeline || !source || !queue || !vidconv || !jpegenc || !appsink) {
config.logger->error(
"[RTSP Pipeline] Failed to create one or more GStreamer elements.");
return false;
}
config.logger->info("[RTSP Pipeline] Configuring RTSP source: {}",
config.rtsp_url);
g_object_set(G_OBJECT(source), "uri", config.rtsp_url.c_str( ), NULL);
config.logger->info("[RTSP Pipeline] Configuring AppSink");
g_object_set(G_OBJECT(appsink), "emit-signals", TRUE, NULL);
g_object_set(G_OBJECT(appsink), "max-buffers", 1, NULL);
g_object_set(G_OBJECT(appsink), "drop", TRUE, NULL);
g_object_set(G_OBJECT(appsink), "sync", TRUE, NULL);
PipelineConfig* config_copy = new PipelineConfig(config);
GstAppSinkCallbacks callbacks = {nullptr};
callbacks.new_sample = on_new_sample;
callbacks.new_preroll = nullptr;
callbacks.new_event = nullptr;
config.logger->info("[RTSP Pipeline] Setting AppSink callbacks");
gst_app_sink_set_callbacks(
GST_APP_SINK(appsink), &callbacks, config_copy,
[](gpointer data) { delete static_cast<PipelineConfig*>(data); });
config.logger->info("[RTSP Pipeline] Adding elements to pipeline");
gst_bin_add_many(GST_BIN(pipeline), source, queue, vidconv, jpegenc,
appsink, NULL);
config.logger->info("[RTSP Pipeline] Linking elements in the pipeline");
if (!gst_element_link_many(queue, vidconv, jpegenc, appsink, NULL)) {
config.logger->error("Elements could not be linked.\n");
gst_object_unref(pipeline);
return false;
}
config.logger->info(
"[RTSP Pipeline] Connecting pad-added signal for RTSP source");
g_signal_connect(source, "pad-added", G_CALLBACK(on_pad_added_dynamic),
queue);
config.logger->info("[RTSP Pipeline] Setting pipeline to PLAYING state");
GstStateChangeReturn ret =
gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
config.logger->error("[RTSP Pipeline] Failed to start pipeline.");
gst_object_unref(pipeline);
return false;
}
config.logger->info("[RTSP Pipeline] Setting up bus watch");
BusCallbackData* callback_data =
new BusCallbackData{config, pipeline, loop};
GstBus* bus = gst_element_get_bus(pipeline);
gst_bus_add_watch(bus, on_bus_message, callback_data);
gst_object_unref(bus);
config.logger->info("[RTSP Pipeline] Pipeline started successfully.");
return true;
}
static void on_pad_added_dynamic(GstElement* src, GstPad* pad, gpointer data) {
GstElement* queue = ( GstElement* ) data;
GstPad* sink_pad = gst_element_get_static_pad(queue, "sink");
if (gst_pad_is_linked(sink_pad)) {
gst_object_unref(sink_pad);
return;
}
// Check the new pad's type
GstCaps* new_pad_caps = gst_pad_get_current_caps(pad);
GstStructure* new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
const gchar* new_pad_type = gst_structure_get_name(new_pad_struct);
GstCapsFeatures* features = gst_caps_get_features(new_pad_caps, 0);
if (!strncmp(new_pad_type, "video", 5)) {
if (gst_caps_features_contains(features, GST_CAPS_FEATURES_NVMM)) {
rtsp_logger->info("Found Nvidia Decoder");
}
}
if (g_str_has_prefix(new_pad_type, "video/")) {
GstPadLinkReturn ret = gst_pad_link(pad, sink_pad);
if (GST_PAD_LINK_FAILED(ret)) {
rtsp_logger->error("Link failed.\n");
} else {
rtsp_logger->info("Link succeeded (type: %s).\n", new_pad_type);
}
} else {
rtsp_logger->error("It has type '%s' which is not video. Ignoring.\n",
new_pad_type);
}
if (new_pad_caps != NULL) {
gst_caps_unref(new_pad_caps);
}
gst_object_unref(sink_pad);
}
static GstFlowReturn on_new_sample(GstAppSink* appsink, gpointer user_data) {
g_mutex_lock(&mutex);
PipelineConfig* config = static_cast<PipelineConfig*>(user_data);
config->logger->info("[RTSP Pipeline] Config cam id: {}",
config->camera_id);
GstSample* sample = gst_app_sink_pull_sample(appsink);
if (!sample) {
config->logger->warn(
"[RTSP Pipeline] Failed to pull sample from appsink - skipping "
"frame");
return GST_FLOW_OK;
}
GstBuffer* buffer = gst_sample_get_buffer(sample);
if (!buffer) {
config->logger->warn(
"[RTSP Pipeline] Failed to get buffer from sample - skipping "
"frame");
gst_sample_unref(sample);
return GST_FLOW_OK;
}
// Current timestamp
auto now = std::chrono::system_clock::now( );
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch( ))
.count( );
GstMapInfo map;
if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) {
config->logger->warn(
"[RTSP Pipeline] Failed to map GstBuffer - skipping frame");
gst_sample_unref(sample);
return GST_FLOW_OK;
}
if (!map.data || map.size == 0) {
config->logger->warn(
"[RTSP Pipeline] Invalid buffer data - skipping frame");
gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
if (config->zmq_publisher) {
try {
// Prepare string components
std::string camera_id =
"irm/cam" + std::to_string(config->camera_id) + "/stream";
std::string frame_id = "camera_" +
std::to_string(config->camera_id) + "_" +
std::to_string(config->frame_id);
std::string ts_str = std::to_string(timestamp);
std::string size_str = std::to_string(map.size);
// Part 1: Send Camera ID
zmq::message_t id_msg(camera_id.size( ));
memcpy(id_msg.data( ), camera_id.data( ), camera_id.size( ));
config->zmq_publisher->send(id_msg, zmq::send_flags::sndmore);
config->logger->info("[RTSP Pipeline] 1 Sent Camera ID: {}",
camera_id);
// Part 2: Send Frame ID
zmq::message_t frame_id_msg(frame_id.size( ));
memcpy(frame_id_msg.data( ), frame_id.data( ), frame_id.size( ));
config->zmq_publisher->send(frame_id_msg, zmq::send_flags::sndmore);
config->logger->info("[RTSP Pipeline] 2 Sent frame_id: {}",
frame_id);
config->frame_id++;
// Part 3: Send Timestamp
zmq::message_t timestamp_msg(ts_str.size( ));
memcpy(timestamp_msg.data( ), ts_str.data( ), ts_str.size( ));
config->zmq_publisher->send(timestamp_msg,
zmq::send_flags::sndmore);
config->logger->info("[RTSP Pipeline] 3 Sent Timestamp: {}",
ts_str);
// Part 4: Send Image Size (as string)
zmq::message_t image_size_msg(size_str.size( ));
memcpy(image_size_msg.data( ), size_str.data( ), size_str.size( ));
config->zmq_publisher->send(image_size_msg,
zmq::send_flags::sndmore);
config->logger->info("[RTSP Pipeline] 4 Sent Image Size: {} bytes",
map.size);
// Part 5: Send Image Data (binary)
zmq::message_t image_msg(map.size);
memcpy(image_msg.data( ), map.data, map.size);
config->zmq_publisher->send(image_msg, zmq::send_flags::none);
config->logger->info(
"[RTSP Pipeline] 5 Sent Image of Size: {} bytes", map.size);
config->logger->info("[RTSP Pipeline] Sent to {}",
config->zmq_endpoint);
} catch (const zmq::error_t& e) {
config->logger->warn(
"[RTSP Pipeline] ZMQ send error: {} - skipping frame",
e.what( ));
} catch (const std::exception& e) {
config->logger->warn(
"[RTSP Pipeline] Unexpected error: {} - skipping frame",
e.what( ));
} catch (...) {
config->logger->warn(
"[RTSP Pipeline] Unknown error during ZMQ send - skipping "
"frame");
}
}
gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);
g_mutex_unlock(&mutex);
return GST_FLOW_OK;
}
static gboolean on_bus_message(GstBus* bus, GstMessage* msg, gpointer data) {
BusCallbackData* callback_data = static_cast<BusCallbackData*>(data);
PipelineConfig config = callback_data->config;
GstElement* pipeline = callback_data->pipeline;
GMainLoop* loop = callback_data->loop;
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_STATE_CHANGED: {
GstState old_state, new_state, pending;
gst_message_parse_state_changed(msg, &old_state, &new_state,
&pending);
if (true) {
config.logger->info(
"[RTSP Pipeline] Element {} state change: {} -> {} "
"(pending: {})",
GST_OBJECT_NAME(msg->src),
gst_element_state_get_name(old_state),
gst_element_state_get_name(new_state),
gst_element_state_get_name(pending));
}
break;
}
case GST_MESSAGE_ERROR: {
GError* err = nullptr;
gchar* debug_info = nullptr;
gst_message_parse_error(msg, &err, &debug_info);
config.logger->error(
"[RTSP Pipeline] Error Occurred!!, attempting to restart "
"pipeline...");
config.logger->error("[RTSP Pipeline] Error from element {}: {}",
GST_OBJECT_NAME(msg->src), err->message);
config.logger->error("[RTSP Pipeline] Debugging information: {}",
debug_info ? debug_info : "none");
g_error_free(err);
g_free(debug_info);
// Instead of just changing state, we'll completely recreate the
// pipeline
config.logger->info("[RTSP Pipeline] Destroying old pipeline...");
gst_element_set_state(pipeline, GST_STATE_NULL);
// Give a short delay to ensure cleanup
g_usleep(500000); // 500ms
GstBus* bus = gst_element_get_bus(pipeline);
gst_bus_remove_watch(bus);
gst_object_unref(bus);
// Unref old pipeline
gst_object_unref(pipeline);
config.logger->info("[RTSP Pipeline] Creating new pipeline...");
// Create a new pipeline
if (!create_rtsp_pipeline_dynamic(config, loop)) {
config.logger->error(
"[RTSP Pipeline] Failed to recreate pipeline, exiting.");
g_main_loop_quit(loop);
} else {
config.logger->info(
"[RTSP Pipeline] Pipeline successfully recreated and "
"started.");
}
delete callback_data;
// Update the callback data with the new pipeline
callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");
// Since we've destroyed and recreated the pipeline, we no longer
// need this bus watch
return FALSE;
break;
}
case GST_MESSAGE_EOS: {
config.logger->info(
"[RTSP Pipeline] End of stream received, restarting pipeline");
// Clean up properly
config.logger->info("[RTSP Pipeline] Destroying old pipeline...");
gst_element_set_state(pipeline, GST_STATE_NULL);
g_usleep(500000);
GstBus* pipeline_bus = gst_element_get_bus(pipeline);
gst_bus_remove_watch(pipeline_bus);
gst_object_unref(pipeline_bus);
gst_object_unref(pipeline);
config.logger->info("[RTSP Pipeline] Creating new pipeline...");
if (!create_rtsp_pipeline_dynamic(config, loop)) {
config.logger->error(
"[RTSP Pipeline] Failed to recreate pipeline, exiting.");
g_main_loop_quit(loop);
} else {
config.logger->info(
"[RTSP Pipeline] Pipeline successfully recreated and "
"started.");
}
delete callback_data;
// Update the callback data with the new pipeline
callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");
return FALSE;
}
default:
break;
}
return TRUE;
}
int main(int argc, char* argv[]) {
auto max_size = 1048576 * 100;
auto max_files = 100;
rtsp_logger = spdlog::rotating_logger_mt(
"ImgRcvM-RTSP", "logs/rtsp_pipeline_logs.txt", max_size, max_files);
rtsp_logger->flush_on(spdlog::level::info);
rtsp_logger->info("[Initiating] Starting Multi-Camera Receiver Module...");
// Printing Version on cli
for (int i = 1; i < argc; ++i) {
if (std::string(argv[i]) == "--version") {
print_version( );
return 0;
}
}
if (argc < 2) {
rtsp_logger->error("[Initiating] Usage: {} <config.json>", argv[0]);
return 1;
}
std::string config_file = argv[1];
gst_init(nullptr, nullptr);
g_mutex_init(&mutex);
std::ifstream file(config_file);
if (!file.is_open( )) {
rtsp_logger->error("[Initiating] Failed to open config file: {}",
config_file);
return 1;
}
json config_json;
try {
file >> config_json;
} catch (const std::exception& e) {
rtsp_logger->error("[Initiating] Error parsing JSON config: {}",
e.what( ));
return 1;
}
g_zmq_context = new zmq::context_t(1);
if (!g_zmq_context) {
rtsp_logger->error("[Initiating] Failed to initialize ZeroMQ context");
return 1;
}
g_zmq_publisher = new zmq::socket_t(*g_zmq_context, ZMQ_PUB);
std::string global_endpoint;
int global_hwm;
if (config_json.contains("zmq")) {
auto& zmq_json = config_json["zmq"];
if (zmq_json.contains("endpoint"))
global_endpoint = zmq_json["endpoint"];
if (zmq_json.contains("hwm")) global_hwm = zmq_json["hwm"];
}
g_zmq_publisher->set(zmq::sockopt::sndhwm, global_hwm);
g_zmq_publisher->set(zmq::sockopt::sndbuf, 10 * 1024 * 1024);
g_zmq_publisher->bind(global_endpoint);
rtsp_logger->info("[Initiating] Bound global ZMQ Publisher to {}",
global_endpoint);
std::vector<GMainLoop*> loops;
std::vector<zmq::socket_t*> publishers;
std::vector<GstElement*> pipelines;
std::vector<PipelineConfig> camera_configs;
// Video
rtsp_logger->info("[Initiating] Processing {} videos",
config_json["videos"].size( ));
std::vector<std::thread> video_threads;
for (size_t i = 0; i < config_json["videos"].size( ); i++) {
auto& video_json = config_json["videos"][i];
if (video_json.contains("location")) {
rtsp_logger->info("[Initiating] Video Source: {}",
video_json["location"]);
video_threads.emplace_back(VideoPipeline, video_json, i);
}
}
// Images
rtsp_logger->info("[Initiating] Processing {} image sources",
config_json["images"].size( ));
std::vector<std::thread> image_threads;
for (size_t i = 0; i < config_json["images"].size( ); i++) {
auto& image_json = config_json["images"][i];
if (image_json.contains("location")) {
rtsp_logger->info("[Initiating] Image Source: {}",
image_json["location"]);
image_threads.emplace_back(ImagePipeline, image_json, i);
}
}
// RTSP cameras
rtsp_logger->info("[Initiating] Processing {} cameras",
config_json["cameras"].size( ));
for (size_t i = 0; i < config_json["cameras"].size( ); i++) {
PipelineConfig camera_config;
auto& camera_json = config_json["cameras"][i];
g_mutex_init(&(camera_config.pipeline_mutex));
if (camera_json.contains("rtsp")) {
auto& rtsp = camera_json["rtsp"];
if (rtsp.contains("url")) camera_config.rtsp_url = rtsp["url"];
if (rtsp.contains("camera_id"))
camera_config.camera_id = rtsp["camera_id"];
}
if (camera_json.contains("jpeg") &&
camera_json["jpeg"].contains("quality")) {
camera_config.jpeg_quality = camera_json["jpeg"]["quality"];
} else if (config_json.contains("jpeg") &&
config_json["jpeg"].contains("quality")) {
camera_config.jpeg_quality = config_json["jpeg"]["quality"];
}
camera_config.zmq_publisher = g_zmq_publisher;
camera_config.zmq_endpoint = global_endpoint;
rtsp_logger->info("[Initiating] Camera {} RTSP URL: {} Camera ID: {}",
i, camera_config.rtsp_url, camera_config.camera_id);
rtsp_logger->info("[Initiating] Camera {} ZMQ endpoint: {}", i,
camera_config.zmq_endpoint);
std::string logger_name =
"ImgRcvM-RTSP_" + std::to_string(camera_config.camera_id);
std::string log_file_name = "logs/rtsp_pipeline_camera_id_" +
std::to_string(camera_config.camera_id) +
"_.txt";
camera_config.logger =
spdlog::rotating_logger_mt<spdlog::async_factory>(
logger_name, log_file_name, max_size, max_files);
camera_config.logger->flush_on(spdlog::level::info);
GMainLoop* loop = g_main_loop_new(NULL, FALSE);
loops.push_back(loop);
if (!create_rtsp_pipeline_dynamic(camera_config, loop)) {
rtsp_logger->error(
"[RTSP Pipeline] Failed to create pipeline for camera {}", i);
continue;
}
pipelines.push_back(
gst_pipeline_new(("pipeline-" + std::to_string(i)).c_str( )));
camera_configs.push_back(camera_config);
}
if (!loops.empty( )) {
g_main_loop_run(loops[0]);
}
for (auto loop : loops) {
g_main_loop_unref(loop);
}
for (auto publisher : publishers) {
publisher->close( );
delete publisher;
}
if (g_zmq_publisher) {
g_zmq_publisher->close( );
delete g_zmq_publisher;
g_zmq_publisher = nullptr;
}
for (auto& thread : video_threads) {
if (thread.joinable( )) {
thread.join( );
}
}
for (auto& thread : image_threads) {
if (thread.joinable( )) {
thread.join( );
}
}
return 0;
}
I have exhausted all of my options. Hence putting it here.
What I am trying to do is, if any camera goes down, the entire pipeline should not stop. Other cameras should keep playing and the one that got disconnected, pipeline should keep trying it.
Hi,
A couple of observations:
-
I think you don’t need to do
gst_initevery time in thecreate_rtsp_pipeline_dynamic. It is initialized in themainonce, and that should be sufficient. -
In the case of error and EOS, you are clearing the
callback_databut trying to access it again without allocation. Is it not supposed to crash because of invalid memory access, or am I missing something? Also, you create a new pipeline increate_rtsp_pipeline_dynamicso what is the point of creating another pipeline in the case of EOS and Error?
delete callback_data;
// Update the callback data with the new pipeline
callback_data->pipeline = gst_pipeline_new("rtsp-pipeline");