Does anyone know how to unit test GStreamer c++ app?

Hello, I built a GStreamer C++ app, and I’m wondering how I could build unit test of my components. I can’t figure out how to test with a video stream. Especially since I can get different input format etc… Maybe there is a way to mock and if the mock corresponds to the ouput I get ?

For inputs for example I have the following code:

#include "POSIXShmPipelineStrategy.hpp"
#include "Logger.hpp"
#include "StreamThread.hpp"

#include <fcntl.h>
#include <gst/app/gstappsrc.h>
#include <sys/mman.h>
#include <unistd.h>

static GstPadProbeReturn
event_probe_callback(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
  if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
    GstEvent *event = gst_pad_probe_info_get_event(info);
    if (GST_EVENT_TYPE(event) == GST_EVENT_SEGMENT) {
      const GstSegment *segment;
      gst_event_parse_segment(event, &segment);
      Logger::getInstance().log(Logger::INFO,
                                std::string("Segment format: ") +
                                    gst_format_get_name(segment->format));
    }
  }
  return GST_PAD_PROBE_OK;
}

// Add the probe to the appropriate pad

void POSIXShmPipelineStrategy::init_elements(StreamThread &streamThread) {
  Logger::getInstance().log(
      Logger::INFO, "Initializing elements of POSIXShmPipelineStrategy");

  // Open shared memory
  streamThread.shm_fd =
      shm_open(streamThread.getVideoSource().source.c_str(), O_RDONLY, 0666);
  if (streamThread.shm_fd == -1) {
    Logger::getInstance().log(Logger::ERROR,
                              "Error: Failed to open shared memory.");
    throw std::runtime_error("Shared memory open failed");
  }

  Logger::getInstance().log(Logger::INFO, "Shared memory opened successfully");

  // Map shared memory to process address space
  streamThread.shm_ptr =
      (unsigned char *)mmap(0,
                            (streamThread.getVideoSource().width *
                             streamThread.getVideoSource().height) *
                                3,
                            PROT_READ, MAP_SHARED, streamThread.shm_fd, 0);
  if (streamThread.shm_ptr == MAP_FAILED) {
    Logger::getInstance().log(Logger::ERROR,
                              "Error: Failed to map shared memory.");
    close(streamThread.shm_fd);
    throw std::runtime_error("Shared memory mapping failed");
  }
  Logger::getInstance().log(Logger::INFO, "Shared memory mapped successfully");

  // Initialize GStreamer elements
  streamThread.source = gst_element_factory_make("appsrc", "mysource");
  streamThread.convert = gst_element_factory_make("videoconvert", "convert");
  streamThread.sink = gst_element_factory_make("tee", "sink");

  if (!streamThread.source || !streamThread.convert || !streamThread.sink) {
    Logger::getInstance().log(Logger::ERROR,
                              "Not all elements could be created.");
    throw std::runtime_error("Element creation failed");
  }

  // Create the GStreamer pipeline
  streamThread.pipeline = gst_pipeline_new("shm-pipeline");
  if (!streamThread.pipeline) {
    Logger::getInstance().log(Logger::ERROR, "Pipeline could not be created.");
    throw std::runtime_error("Pipeline creation failed");
  }
}

void POSIXShmPipelineStrategy::build_pipeline(StreamThread &streamThread) {
  // Configure appsrc (shared memory source)
  g_object_set(streamThread.source, "stream-type", GST_APP_STREAM_TYPE_STREAM,
               "is-live", TRUE, "block", FALSE, "format", GST_FORMAT_TIME,
               NULL);

  GstPad *pad = gst_element_get_static_pad(streamThread.source, "src");
  gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
                    event_probe_callback, NULL, NULL);
  gst_object_unref(pad);

  // Set the caps for appsrc
  GstCaps *caps = gst_caps_new_simple(
      "video/x-raw", "format", G_TYPE_STRING, "BGR", "width", G_TYPE_INT,
      streamThread.getVideoSource().width, "height", G_TYPE_INT,
      streamThread.getVideoSource().height, "framerate", GST_TYPE_FRACTION, 20,
      1, NULL);
  gst_app_src_set_caps(GST_APP_SRC(streamThread.source), caps);
  gst_caps_unref(caps);

  // Add elements to the pipeline (source, convert, tee)
  gst_bin_add_many(GST_BIN(streamThread.pipeline), streamThread.source,
                   streamThread.convert, streamThread.sink, NULL);

  // Link the elements
  if (gst_element_link_many(streamThread.source, streamThread.convert,
                            streamThread.sink, NULL) != TRUE) {
    Logger::getInstance().log(Logger::ERROR, "Elements could not be linked.");
    throw std::runtime_error("Pipeline linking failed");
  }
  Logger::getInstance().log(Logger::INFO, "Elements linked successfully");

  // Set up the GStreamer bus for error handling
  GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(streamThread.pipeline));
  gst_bus_add_watch(bus, (GstBusFunc)StreamThread::message_cb, &streamThread);
  gst_object_unref(bus);

  // Attach additional branches using dataflow function
  if (!streamThread.dataflow(streamThread.sink)) {
    Logger::getInstance().log(Logger::ERROR, "Dataflow connection failed.");
    throw std::runtime_error("Failed to attach additional branches via tee.");
  }
  if (!streamThread.shared_original_flow(streamThread.sink)) {
    Logger::getInstance().log(Logger::ERROR,
                              "Failed to add original flow to the pipeline.");
    throw std::runtime_error("Adding original flow failed");
  }

  // Connect signals
  g_signal_connect(streamThread.source, "need-data", G_CALLBACK(start_feed),
                   &streamThread);
  g_signal_connect(streamThread.source, "enough-data", G_CALLBACK(stop_feed),
                   &streamThread);
}

gboolean POSIXShmPipelineStrategy::read_data(StreamThread *streamThread) {
  if (!streamThread->is_active.load()) {
    // Push end-of-stream if the thread is no longer active
    gst_app_src_end_of_stream(GST_APP_SRC(streamThread->source));
    return GST_FLOW_OK;
  }
  GstFlowReturn ret;
  gboolean ok = TRUE;

  GstBuffer *buffer =
      gst_buffer_new_allocate(nullptr,
                              ((streamThread->getVideoSource().width *
                                streamThread->getVideoSource().height) *
                               3),
                              nullptr);
  GstMapInfo map;
  gst_buffer_map(buffer, &map, GST_MAP_WRITE);

  // Copy frame data from shared memory to the GStreamer buffer
  memcpy(map.data, streamThread->shm_ptr,
         ((streamThread->getVideoSource().width *
           streamThread->getVideoSource().height) *
          3));

  gst_buffer_unmap(buffer, &map);

  // Push buffer to appsrc
  g_signal_emit_by_name(streamThread->source, "push-buffer", buffer, &ret);
  gst_buffer_unref(buffer);

  if (ret != GST_FLOW_OK) {
    Logger::getInstance().log(Logger::ERROR,
                              "Error occurred while pushing buffer to appsrc");
    ok = FALSE;
  }

  return ok;
}

void POSIXShmPipelineStrategy::start_feed(GstElement *pipeline, guint size,
                                          StreamThread *streamThread) {
  if (streamThread->sourceid == 0) {
    streamThread->sourceid = g_idle_add((GSourceFunc)read_data, streamThread);
  }
}

void POSIXShmPipelineStrategy::stop_feed(GstElement *pipeline,
                                         StreamThread *streamThread) {
  if (streamThread->sourceid != 0) {
    g_source_remove(streamThread->sourceid);
    streamThread->sourceid = 0;
  }
}

And for output I have this kind of code:

#include "AutoVideoSink.hpp"
#include "Logger.hpp"
#include "StreamThread.hpp"

#include <iostream>

AutoVideoSink::AutoVideoSink(StreamThread &streamThread,
                             const std::string &pipeline_id,
                             const std::string &channel_name)
    : PipelineInterface(pipeline_id), pipeline(nullptr), intervideosrc(nullptr),
      videosink(nullptr), bus(nullptr), loop(nullptr), running(false) {

  type = "Autovideo Sink";
  pipeline = gst_pipeline_new("consumer-pipeline");

  this->addObserver(&streamThread);

  // Create the elements
  intervideosrc = gst_element_factory_make("intervideosrc", "intervideosrc");
  queue = gst_element_factory_make("queue", "queue");
  videosink = gst_element_factory_make("autovideosink", "videosink");
  videoconvert = gst_element_factory_make("videoconvert", "videoconvert");
  capsfilter = gst_element_factory_make("capsfilter", "capsfilter");

  // Check elements creation
  if (!pipeline || !intervideosrc || !videosink || !queue || !videoconvert ||
      !capsfilter) {
    throw std::runtime_error("Failed to create GStreamer elements");
  }

  // Set the channel property to match the producer's intervideosink
  g_object_set(intervideosrc, "channel", channel_name.c_str(), NULL);

  // Add elements to the pipeline
  gst_bin_add_many(GST_BIN(pipeline), intervideosrc, queue, videosink,
                   videoconvert, capsfilter, NULL);
  GstCaps *caps =
      gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL);
  g_object_set(capsfilter, "caps", caps, NULL);
  gst_caps_unref(caps);

  // Link elements
  if (!gst_element_link_many(intervideosrc, queue, videoconvert, capsfilter,
                             videosink, NULL)) {
    throw std::runtime_error("Elements could not be linked");
  }

  bus = gst_element_get_bus(pipeline);
  if (!bus) {
    Logger::getInstance().log(Logger::ERROR,
                              "Failed to get bus from pipeline.");
  }
  // Add a watch to the bus
  guint bus_watch_id = gst_bus_add_watch(bus, (GstBusFunc)message_cb, this);
  if (bus_watch_id == 0) {
    Logger::getInstance().log(Logger::ERROR, "Failed to add watch to bus");
    gst_object_unref(bus);
    return;
  }
}

AutoVideoSink::~AutoVideoSink() {
  Logger::getInstance().log(Logger::INFO, "AutoVideoSink destructor called");
  gst_object_unref(bus);
  gst_element_set_state(pipeline, GST_STATE_NULL);
  gst_object_unref(pipeline);
}

bool AutoVideoSink::start() {
  ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    Logger::getInstance().log(
        Logger::ERROR,
        "Unable to set the AutoVideoSink pipeline to the playing state.");
    return false;
  }

  running = true;

  // Run the main loop in a separate thread
  loop = g_main_loop_new(NULL, FALSE);
  gst_thread = std::thread(run_main_loop_thread, loop);
  gst_thread.detach();

  return true;
}

void AutoVideoSink::stop() {
  if (running) {
    g_main_loop_quit(loop);
    g_main_loop_unref(loop);
    running = false;
    this->notifyPipelineStopped(this->getPipelineId());
  }
}

gboolean AutoVideoSink::message_cb(GstBus *bus, GstMessage *message,
                                   gpointer user_data) {
  AutoVideoSink *self = static_cast<AutoVideoSink *>(user_data);
  // Handle GStreamer messages (e.g., error, EOS)
  switch (GST_MESSAGE_TYPE(message)) {
  case GST_MESSAGE_ERROR:
    GError *err;
    gchar *debug_info;
    gst_message_parse_error(message, &err, &debug_info);
    std::cerr << "Error received from element " << GST_OBJECT_NAME(message->src)
              << ": " << err->message << std::endl;
    std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
              << std::endl;
    g_clear_error(&err);
    g_free(debug_info);
    self->stop();
    break;
  case GST_MESSAGE_EOS:
    Logger::getInstance().log(Logger::INFO, "End-Of-Stream reached.");
    self->stop();
    break;
  default:
    break;
  }
  return TRUE;
}

void AutoVideoSink::run_main_loop_thread(GMainLoop *loop) {
  g_main_loop_run(loop);
}

std::string AutoVideoSink::getType() const { return type; }