I am reading frames from a posix shared memory using appsrc. Here’s the pipeline I use :
void POSIXShmPipelineStrategy::init_elements(StreamThread &streamThread) {
std::cout << "Initializing elements of POSIXShmPipelineStrategy" << std::endl;
// Open shared memory
streamThread.shm_fd =
shm_open(streamThread.getVideoSource().source.c_str(), O_RDONLY, 0666);
if (streamThread.shm_fd == -1) {
std::cerr << "Error: Failed to open shared memory." << std::endl;
throw std::runtime_error("Shared memory open failed");
}
std::cout << "Shared memory opened successfully" << std::endl;
// Map shared memory to process address space
streamThread.shm_ptr =
(unsigned char *)mmap(0,
(streamThread.getVideoSource().width *
streamThread.getVideoSource().height) *
3,
PROT_READ, MAP_SHARED, streamThread.shm_fd, 0);
if (streamThread.shm_ptr == MAP_FAILED) {
std::cerr << "Error: Failed to map shared memory." << std::endl;
close(streamThread.shm_fd);
throw std::runtime_error("Shared memory mapping failed");
}
std::cout << "Shared memory mapped successfully" << std::endl;
// Initialize GStreamer elements
streamThread.source = gst_element_factory_make("appsrc", "mysource");
streamThread.convert = gst_element_factory_make("videoconvert", "convert");
streamThread.sink = gst_element_factory_make("tee", "sink");
if (!streamThread.source || !streamThread.convert || !streamThread.sink) {
g_printerr("Not all elements could be created.\n");
throw std::runtime_error("Element creation failed");
}
// Create the GStreamer pipeline
streamThread.pipeline = gst_pipeline_new("shm-pipeline");
if (!streamThread.pipeline) {
g_printerr("Pipeline could not be created.\n");
throw std::runtime_error("Pipeline creation failed");
}
}
void POSIXShmPipelineStrategy::build_pipeline(StreamThread &streamThread) {
// Configure appsrc (shared memory source)
g_object_set(streamThread.source, "stream-type", GST_APP_STREAM_TYPE_STREAM,
"is-live", TRUE, "block", FALSE, NULL);
// Set the caps for appsrc
GstCaps *caps = gst_caps_new_simple(
"video/x-raw", "format", G_TYPE_STRING, "BGR", "width", G_TYPE_INT,
streamThread.getVideoSource().width, "height", G_TYPE_INT,
streamThread.getVideoSource().height, "framerate", GST_TYPE_FRACTION, 20,
1, NULL);
gst_app_src_set_caps(GST_APP_SRC(streamThread.source), caps);
gst_caps_unref(caps);
// Add elements to the pipeline (source, convert, tee)
gst_bin_add_many(GST_BIN(streamThread.pipeline), streamThread.source,
streamThread.convert, streamThread.sink, NULL);
// Link the elements
if (gst_element_link_many(streamThread.source, streamThread.convert,
streamThread.sink, NULL) != TRUE) {
g_printerr("Elements could not be linked.\n");
throw std::runtime_error("Pipeline linking failed");
}
std::cout << "Elements linked successfully" << std::endl;
// Set up the GStreamer bus for error handling
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(streamThread.pipeline));
gst_bus_add_watch(bus, (GstBusFunc)StreamThread::message_cb, &streamThread);
gst_object_unref(bus);
// Attach additional branches using dataflow function
if (!streamThread.dataflow(streamThread.sink)) {
g_printerr("Dataflow connection failed.\n");
throw std::runtime_error("Failed to attach additional branches via tee.");
}
// Connect signals
g_signal_connect(streamThread.source, "need-data", G_CALLBACK(start_feed),
&streamThread);
g_signal_connect(streamThread.source, "enough-data", G_CALLBACK(stop_feed),
&streamThread);
}
gboolean POSIXShmPipelineStrategy::read_data(StreamThread *streamThread) {
GstFlowReturn ret;
gboolean ok = TRUE;
GstBuffer *buffer =
gst_buffer_new_allocate(nullptr,
((streamThread->getVideoSource().width *
streamThread->getVideoSource().height) *
3),
nullptr);
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
// Copy frame data from shared memory to the GStreamer buffer
memcpy(map.data, streamThread->shm_ptr,
((streamThread->getVideoSource().width *
streamThread->getVideoSource().height) *
3));
gst_buffer_unmap(buffer, &map);
// Push buffer to appsrc
g_signal_emit_by_name(streamThread->source, "push-buffer", buffer, &ret);
gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK) {
std::cerr << "Error occurred while pushing buffer to appsrc" << std::endl;
ok = FALSE;
}
return ok;
}
void POSIXShmPipelineStrategy::start_feed(GstElement *pipeline, guint size,
StreamThread *streamThread) {
if (streamThread->sourceid == 0) {
streamThread->sourceid = g_idle_add((GSourceFunc)read_data, streamThread);
}
}
void POSIXShmPipelineStrategy::stop_feed(GstElement *pipeline,
StreamThread *streamThread) {
if (streamThread->sourceid != 0) {
g_source_remove(streamThread->sourceid);
streamThread->sourceid = 0;
}
}
To get the stream flowing I use a tee with a fakesink, in the dataflow function, which might be the cause for this problem :
gboolean StreamThread::dataflow(GstElement *sink) {
// Create the necessary elements
GstElement *queue = gst_element_factory_make("queue", NULL);
GstElement *videoconvert = gst_element_factory_make("videoconvert", NULL);
GstElement *capsfilter = gst_element_factory_make("capsfilter", NULL);
GstElement *fakesink = gst_element_factory_make("fakesink", NULL);
if (!queue || !videoconvert || !capsfilter || !fakesink) {
g_printerr("Not all elements could be created.\n");
return FALSE;
}
// Set caps on capsfilter to convert the format from BGR to I420
GstCaps *caps =
gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING,
videoSource.format.c_str(), // Ensuring I420 format
"width", G_TYPE_INT, 1280, "height", G_TYPE_INT, 720,
"framerate", GST_TYPE_FRACTION, 20, 1, NULL);
g_object_set(capsfilter, "caps", caps, NULL);
gst_caps_unref(caps);
// Request a pad from the tee (sink) element
GstPadTemplate *templ =
gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(sink), "src_%u");
if (!templ) {
g_printerr("Pad template not found on sink element.\n");
return FALSE;
}
GstPad *sinkpad = gst_element_request_pad(sink, templ, NULL, NULL);
if (!sinkpad) {
g_printerr("Failed to request pad from sink element.\n");
return FALSE;
}
// Add elements to the pipeline
gst_bin_add_many(GST_BIN(pipeline), queue, videoconvert, capsfilter, fakesink,
NULL);
// Link the elements (queue -> videoconvert -> capsfilter -> fakesink)
if (gst_element_link_many(queue, videoconvert, capsfilter, fakesink, NULL) !=
TRUE) {
g_printerr("Elements could not be linked together.\n");
gst_object_unref(pipeline);
return FALSE;
}
// Sync state with parent pipeline
gst_element_sync_state_with_parent(queue);
gst_element_sync_state_with_parent(videoconvert);
gst_element_sync_state_with_parent(capsfilter);
gst_element_sync_state_with_parent(fakesink);
// Get the sink pad from the queue element and link it to the tee's pad
GstPad *srcpad = gst_element_get_static_pad(queue, "sink");
if (!srcpad) {
g_printerr("Failed to get static pad from queue element.\n");
return FALSE;
}
// Link the tee's pad to the queue's sink pad
if (gst_pad_link(sinkpad, srcpad) != GST_PAD_LINK_OK) {
g_printerr("Failed to link pads between sink and queue.\n");
gst_object_unref(srcpad);
return FALSE;
}
// Unref the pads after linking
gst_object_unref(srcpad);
gst_object_unref(sinkpad);
return TRUE;
}
I tried removing the capsfilter but it doesn’t change anything.
Finally I crop and scale the stream with the following function :
bool StreamThread::create_transformed_queue() {
std::lock_guard<std::mutex> lock(pipeline_mutex);
GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "sink");
if (!sink) {
g_printerr("Error: sink (tee) element is NULL.\n");
return FALSE; // Handle the error appropriately
}
GstPadTemplate *templ =
gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(sink), "src_%u");
std::cout << "there" << std::endl;
if (!templ) {
g_printerr("Pad template not found.\n");
return FALSE;
}
GstPad *sinkpad = gst_element_request_pad(sink, templ, NULL, NULL);
if (!sinkpad) {
g_printerr("Failed to request pad.\n");
return FALSE;
}
// Create the bin to encapsulate the elements
bin_transform = gst_bin_new("transformation_bin");
// Create the elements for queue, scaling, cropping, and caps filtering
queue_transform = gst_element_factory_make("queue", "transform_queue");
videoscale = gst_element_factory_make("videoscale", "transform_videoscale");
videocrop = gst_element_factory_make("videocrop", "transform_videocrop");
capsfilter_transform =
gst_element_factory_make("capsfilter", "capsfilter_transform");
intervideosink_transform =
gst_element_factory_make("intervideosink", "intervideosink_transform");
if (!queue_transform || !videoscale || !videocrop || !capsfilter_transform ||
!intervideosink_transform) {
g_printerr("Not all elements could be created.\n");
return false;
}
// Add the queue as the first element of the bin
gst_bin_add_many(GST_BIN(pipeline), queue_transform, videocrop, videoscale,
capsfilter_transform, intervideosink_transform, NULL);
// set the caps for videoscale
GstCaps *caps =
gst_caps_new_simple("video/x-raw", "width", G_TYPE_INT, videoSource.width,
"height", G_TYPE_INT, videoSource.height, "format",
G_TYPE_STRING, videoSource.format.c_str(), NULL);
g_object_set(capsfilter_transform, "caps", caps, NULL);
gst_caps_unref(caps);
// Set crop properties
g_object_set(videocrop, "left", videoSource.crop[0][0], "right",
videoSource.crop[0][1], "top", videoSource.crop[1][0], "bottom",
videoSource.crop[1][1], NULL);
// Set the channel for intervideosink
g_object_set(intervideosink_transform, "channel", "transformed", NULL);
if (gst_element_link_many(queue_transform, videoscale, capsfilter_transform,
videocrop, intervideosink_transform,
NULL) != TRUE) {
g_printerr("Elements could not be linked.\n");
gst_object_unref(pipeline);
return FALSE;
}
gst_element_sync_state_with_parent(queue_transform);
gst_element_sync_state_with_parent(videoscale);
gst_element_sync_state_with_parent(capsfilter_transform);
gst_element_sync_state_with_parent(videocrop);
gst_element_sync_state_with_parent(intervideosink_transform);
GstPad *srcpad = gst_element_get_static_pad(queue_transform, "sink");
if (!srcpad) {
g_printerr("Failed to get static pad from queue.\n");
return FALSE;
}
if (gst_pad_link(sinkpad, srcpad) != GST_PAD_LINK_OK) {
g_printerr("Failed to link pads\n");
gst_object_unref(srcpad);
return FALSE;
}
gst_object_unref(srcpad);
return true;
}
I keep getting the following message tens of time per second :
(crow:25502): GStreamer-CRITICAL **: 15:07:05.207: gst_segment_to_stream_time: assertion 'segment->format == format' failed
(crow:25502): GStreamer-CRITICAL **: 15:07:05.207: gst_segment_to_stream_time: assertion 'segment->format == format' failed
But what’s strange is that everything works well. Tried running the program with GST_DEBUG=GST_CAPS:5,GST_SEGMENT:5 but didn’t see anything strange.
I tried modifying the caps but since I receive BGR frames I don’t believe it is the correct approach and also it didn’t fix much. I don’t mind the stream to be slightly desynchronized. I managed to make a similar pipeline works without any problem by removing the tee and fakesink and just streaming to an autovideosink. But I need the tee to link dynamically my pipeline.