Hello, I built a GStreamer C++ app, and I’m wondering how I could build unit test of my components. I can’t figure out how to test with a video stream. Especially since I can get different input format etc… Maybe there is a way to mock and if the mock corresponds to the ouput I get ?
For inputs for example I have the following code:
#include "POSIXShmPipelineStrategy.hpp"
#include "Logger.hpp"
#include "StreamThread.hpp"
#include <fcntl.h>
#include <gst/app/gstappsrc.h>
#include <sys/mman.h>
#include <unistd.h>
static GstPadProbeReturn
event_probe_callback(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event(info);
if (GST_EVENT_TYPE(event) == GST_EVENT_SEGMENT) {
const GstSegment *segment;
gst_event_parse_segment(event, &segment);
Logger::getInstance().log(Logger::INFO,
std::string("Segment format: ") +
gst_format_get_name(segment->format));
}
}
return GST_PAD_PROBE_OK;
}
// Add the probe to the appropriate pad
void POSIXShmPipelineStrategy::init_elements(StreamThread &streamThread) {
Logger::getInstance().log(
Logger::INFO, "Initializing elements of POSIXShmPipelineStrategy");
// Open shared memory
streamThread.shm_fd =
shm_open(streamThread.getVideoSource().source.c_str(), O_RDONLY, 0666);
if (streamThread.shm_fd == -1) {
Logger::getInstance().log(Logger::ERROR,
"Error: Failed to open shared memory.");
throw std::runtime_error("Shared memory open failed");
}
Logger::getInstance().log(Logger::INFO, "Shared memory opened successfully");
// Map shared memory to process address space
streamThread.shm_ptr =
(unsigned char *)mmap(0,
(streamThread.getVideoSource().width *
streamThread.getVideoSource().height) *
3,
PROT_READ, MAP_SHARED, streamThread.shm_fd, 0);
if (streamThread.shm_ptr == MAP_FAILED) {
Logger::getInstance().log(Logger::ERROR,
"Error: Failed to map shared memory.");
close(streamThread.shm_fd);
throw std::runtime_error("Shared memory mapping failed");
}
Logger::getInstance().log(Logger::INFO, "Shared memory mapped successfully");
// Initialize GStreamer elements
streamThread.source = gst_element_factory_make("appsrc", "mysource");
streamThread.convert = gst_element_factory_make("videoconvert", "convert");
streamThread.sink = gst_element_factory_make("tee", "sink");
if (!streamThread.source || !streamThread.convert || !streamThread.sink) {
Logger::getInstance().log(Logger::ERROR,
"Not all elements could be created.");
throw std::runtime_error("Element creation failed");
}
// Create the GStreamer pipeline
streamThread.pipeline = gst_pipeline_new("shm-pipeline");
if (!streamThread.pipeline) {
Logger::getInstance().log(Logger::ERROR, "Pipeline could not be created.");
throw std::runtime_error("Pipeline creation failed");
}
}
void POSIXShmPipelineStrategy::build_pipeline(StreamThread &streamThread) {
// Configure appsrc (shared memory source)
g_object_set(streamThread.source, "stream-type", GST_APP_STREAM_TYPE_STREAM,
"is-live", TRUE, "block", FALSE, "format", GST_FORMAT_TIME,
NULL);
GstPad *pad = gst_element_get_static_pad(streamThread.source, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
event_probe_callback, NULL, NULL);
gst_object_unref(pad);
// Set the caps for appsrc
GstCaps *caps = gst_caps_new_simple(
"video/x-raw", "format", G_TYPE_STRING, "BGR", "width", G_TYPE_INT,
streamThread.getVideoSource().width, "height", G_TYPE_INT,
streamThread.getVideoSource().height, "framerate", GST_TYPE_FRACTION, 20,
1, NULL);
gst_app_src_set_caps(GST_APP_SRC(streamThread.source), caps);
gst_caps_unref(caps);
// Add elements to the pipeline (source, convert, tee)
gst_bin_add_many(GST_BIN(streamThread.pipeline), streamThread.source,
streamThread.convert, streamThread.sink, NULL);
// Link the elements
if (gst_element_link_many(streamThread.source, streamThread.convert,
streamThread.sink, NULL) != TRUE) {
Logger::getInstance().log(Logger::ERROR, "Elements could not be linked.");
throw std::runtime_error("Pipeline linking failed");
}
Logger::getInstance().log(Logger::INFO, "Elements linked successfully");
// Set up the GStreamer bus for error handling
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(streamThread.pipeline));
gst_bus_add_watch(bus, (GstBusFunc)StreamThread::message_cb, &streamThread);
gst_object_unref(bus);
// Attach additional branches using dataflow function
if (!streamThread.dataflow(streamThread.sink)) {
Logger::getInstance().log(Logger::ERROR, "Dataflow connection failed.");
throw std::runtime_error("Failed to attach additional branches via tee.");
}
if (!streamThread.shared_original_flow(streamThread.sink)) {
Logger::getInstance().log(Logger::ERROR,
"Failed to add original flow to the pipeline.");
throw std::runtime_error("Adding original flow failed");
}
// Connect signals
g_signal_connect(streamThread.source, "need-data", G_CALLBACK(start_feed),
&streamThread);
g_signal_connect(streamThread.source, "enough-data", G_CALLBACK(stop_feed),
&streamThread);
}
gboolean POSIXShmPipelineStrategy::read_data(StreamThread *streamThread) {
if (!streamThread->is_active.load()) {
// Push end-of-stream if the thread is no longer active
gst_app_src_end_of_stream(GST_APP_SRC(streamThread->source));
return GST_FLOW_OK;
}
GstFlowReturn ret;
gboolean ok = TRUE;
GstBuffer *buffer =
gst_buffer_new_allocate(nullptr,
((streamThread->getVideoSource().width *
streamThread->getVideoSource().height) *
3),
nullptr);
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
// Copy frame data from shared memory to the GStreamer buffer
memcpy(map.data, streamThread->shm_ptr,
((streamThread->getVideoSource().width *
streamThread->getVideoSource().height) *
3));
gst_buffer_unmap(buffer, &map);
// Push buffer to appsrc
g_signal_emit_by_name(streamThread->source, "push-buffer", buffer, &ret);
gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK) {
Logger::getInstance().log(Logger::ERROR,
"Error occurred while pushing buffer to appsrc");
ok = FALSE;
}
return ok;
}
void POSIXShmPipelineStrategy::start_feed(GstElement *pipeline, guint size,
StreamThread *streamThread) {
if (streamThread->sourceid == 0) {
streamThread->sourceid = g_idle_add((GSourceFunc)read_data, streamThread);
}
}
void POSIXShmPipelineStrategy::stop_feed(GstElement *pipeline,
StreamThread *streamThread) {
if (streamThread->sourceid != 0) {
g_source_remove(streamThread->sourceid);
streamThread->sourceid = 0;
}
}
And for output I have this kind of code:
#include "AutoVideoSink.hpp"
#include "Logger.hpp"
#include "StreamThread.hpp"
#include <iostream>
AutoVideoSink::AutoVideoSink(StreamThread &streamThread,
const std::string &pipeline_id,
const std::string &channel_name)
: PipelineInterface(pipeline_id), pipeline(nullptr), intervideosrc(nullptr),
videosink(nullptr), bus(nullptr), loop(nullptr), running(false) {
type = "Autovideo Sink";
pipeline = gst_pipeline_new("consumer-pipeline");
this->addObserver(&streamThread);
// Create the elements
intervideosrc = gst_element_factory_make("intervideosrc", "intervideosrc");
queue = gst_element_factory_make("queue", "queue");
videosink = gst_element_factory_make("autovideosink", "videosink");
videoconvert = gst_element_factory_make("videoconvert", "videoconvert");
capsfilter = gst_element_factory_make("capsfilter", "capsfilter");
// Check elements creation
if (!pipeline || !intervideosrc || !videosink || !queue || !videoconvert ||
!capsfilter) {
throw std::runtime_error("Failed to create GStreamer elements");
}
// Set the channel property to match the producer's intervideosink
g_object_set(intervideosrc, "channel", channel_name.c_str(), NULL);
// Add elements to the pipeline
gst_bin_add_many(GST_BIN(pipeline), intervideosrc, queue, videosink,
videoconvert, capsfilter, NULL);
GstCaps *caps =
gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL);
g_object_set(capsfilter, "caps", caps, NULL);
gst_caps_unref(caps);
// Link elements
if (!gst_element_link_many(intervideosrc, queue, videoconvert, capsfilter,
videosink, NULL)) {
throw std::runtime_error("Elements could not be linked");
}
bus = gst_element_get_bus(pipeline);
if (!bus) {
Logger::getInstance().log(Logger::ERROR,
"Failed to get bus from pipeline.");
}
// Add a watch to the bus
guint bus_watch_id = gst_bus_add_watch(bus, (GstBusFunc)message_cb, this);
if (bus_watch_id == 0) {
Logger::getInstance().log(Logger::ERROR, "Failed to add watch to bus");
gst_object_unref(bus);
return;
}
}
AutoVideoSink::~AutoVideoSink() {
Logger::getInstance().log(Logger::INFO, "AutoVideoSink destructor called");
gst_object_unref(bus);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
}
bool AutoVideoSink::start() {
ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
Logger::getInstance().log(
Logger::ERROR,
"Unable to set the AutoVideoSink pipeline to the playing state.");
return false;
}
running = true;
// Run the main loop in a separate thread
loop = g_main_loop_new(NULL, FALSE);
gst_thread = std::thread(run_main_loop_thread, loop);
gst_thread.detach();
return true;
}
void AutoVideoSink::stop() {
if (running) {
g_main_loop_quit(loop);
g_main_loop_unref(loop);
running = false;
this->notifyPipelineStopped(this->getPipelineId());
}
}
gboolean AutoVideoSink::message_cb(GstBus *bus, GstMessage *message,
gpointer user_data) {
AutoVideoSink *self = static_cast<AutoVideoSink *>(user_data);
// Handle GStreamer messages (e.g., error, EOS)
switch (GST_MESSAGE_TYPE(message)) {
case GST_MESSAGE_ERROR:
GError *err;
gchar *debug_info;
gst_message_parse_error(message, &err, &debug_info);
std::cerr << "Error received from element " << GST_OBJECT_NAME(message->src)
<< ": " << err->message << std::endl;
std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
<< std::endl;
g_clear_error(&err);
g_free(debug_info);
self->stop();
break;
case GST_MESSAGE_EOS:
Logger::getInstance().log(Logger::INFO, "End-Of-Stream reached.");
self->stop();
break;
default:
break;
}
return TRUE;
}
void AutoVideoSink::run_main_loop_thread(GMainLoop *loop) {
g_main_loop_run(loop);
}
std::string AutoVideoSink::getType() const { return type; }