Hi, I am new to gstreamer, and in parallel to implementing an RTSP server (as explained in my previous post), I am also implementing an RTP streamer that I would like to dynamically shut down and restart. Unlike with RTSP server however, I am encountering a serious issue with the RTP streamer that freezes the entire application:
During the shut down routine of the RTP streamer, the RTP pipeline has to be set to NULL via gst_element_set_state(rtpPipeline, GST_STATE_NULL) before the pipeline can be unreferencend (based on what I gathered from here). Yet this call can occasionally hang indefinitely (for what reason I have no idea). This bug that has already been discussed several times online:
(please excuse the formatting, as a new user I can only post two links)
https://forums.developer.nvidia.com/t/gst-element-set-state-is-hanging-while-setting-the-element-to-null/221460
https://forums.developer.nvidia.com/t/gst-element-set-state-is-hanging-while-setting-the-element-to-null/262517/11
https://forums.developer.nvidia.com/t/pipeline-set-state-meet-segmentation-fault-core-dumped-and-app-crashed/186665
https://gstreamer-devel.narkive.com/fMlsKCKO/gstreamer-hangs-on-gst-element-set-state-when-trying-to-stop-a-transcode-stream
https://discourse.gnome.org/t/can-gst-element-set-state-block/5895
https://bugzilla.gnome.org/show_bug.cgi?id=692145
https://community.nxp.com/t5/i-MX-Processors/Gstreamer-0-10-function-gst-element-set-state-hang/m-p/287650?profile.language=en
https://e2e.ti.com/support/processors-group/processors/f/processors-forum/560240/on-state-change-from-playing-to-null-hangs
Is there any way to stop this from occurring? As with the RTSP server issue, I have created an MWE to experiment with, however in this case, having extracted all the RTP functionality from my original application, I am unable to force the MWE to crash. Here it is anyways:
#include <StreamerTest.h>
/**
\brief Run the RTP main loop, called by an external thread
\return void
*/
void StreamerTest::RunRtpLoop() {
g_main_loop_run(rtpLoop);
std::cout << "Running RTP main loop." << std::endl;
}
static void on_rtp_pipeline_finalized(gpointer data, GObject* where_the_object_was) {
std::cout << "RTP pipeline finalized. Pointer was: " << where_the_object_was << std::endl;
}
/**
\brief Initialize the RTP server using a GStreamer pipeline
\param[in] launchString the string that contains the streaming parameters like format, encoding, etc.
\return true: success, false: error
*/
bool StreamerTest::InitializeRtpServer(std::string launchString) {
{
std::lock_guard<std::mutex> lock(rtpMutex);
rtpLoop = g_main_loop_new(nullptr, false);
std::cout << "RTP main loop created." << std::endl;
GError* error = nullptr;
rtpPipeline = gst_parse_launch(launchString.c_str(), &error);
g_object_weak_ref(G_OBJECT(rtpPipeline), on_rtp_pipeline_finalized, nullptr);
std::cout << "RTP pipeline created with the following launch string:\n\n" << launchString << "\n" << std::endl;
gst_element_set_state(rtpPipeline, GST_STATE_PLAYING);
std::cout << "RTP pipeline state set to PLAYING." << std::endl;
}
rtpThread = std::thread(&StreamerTest::RunRtpLoop, this);
std::cout << "RTP streamer started." << std::endl;
return true;
}
/**
\brief Stop the RTP server
\return true: success, false: error
*/
bool StreamerTest::StopRtpServer() {
bool success = true;
// std::atomic<bool> shutdownComplete{false};
// auto shutdownTask = std::async(std::launch::async, [&](){
{
std::lock_guard<std::mutex> lock(rtpMutex);
if (rtpLoop && g_main_loop_is_running(rtpLoop)) {
g_main_loop_quit(rtpLoop);
std::cout << "RTP main loop stopped."<< std::endl;
} else {
std::cout << "Failed to stop RTP main loop."<< std::endl;
success = false;
}
}
if (rtpThread.joinable()) {
rtpThread.join();
std::cout << "RTP thread joined."<< std::endl;
} else {
std::cout << "Failed to join RTP thread."<< std::endl;
success = false;
}
if (rtpLoop) {
g_main_loop_unref(rtpLoop);
rtpLoop = nullptr;
std::cout << "RTP main loop unreferenced."<< std::endl;
} else {
std::cout << "Failed to unreference RTP main loop."<< std::endl;
success = false;
}
if (rtpPipeline) {
// // 1. Send EOS
// gst_element_send_event(rtpPipeline, gst_event_new_eos());
// // 2. Wait for EOS to propagate
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
// // 3. Transition to PAUSED
// gst_element_set_state(rtpPipeline, GST_STATE_PAUSED);
// gst_element_get_state(rtpPipeline, nullptr, nullptr, GST_SECOND);
// 4. Transition to NULL with timeout
gst_element_set_state(rtpPipeline, GST_STATE_NULL);
std::cout << "RTP pipeline state set to NULL." << std::endl;
// gst_element_get_state(rtpPipeline, nullptr, nullptr, GST_SECOND * 2);
g_clear_object(&rtpPipeline);
// rtpPipeline = nullptr;
std::cout << "RTP pipeline unreferenced." << std::endl;
} else {
std::cout << "Failed to stop and unreference RTP pipeline." << std::endl;
success = false;
}
// shutdownComplete = true;
// });
// // Watchdog timeout
// if (shutdownTask.wait_for(std::chrono::seconds(5)) != std::future_status::ready) {
// // First try graceful exit
// LOG_ERROR1(VDUHardware::logger, "Encoder %u: RTP streamer shutdown timed out. Attempting graceful exit.", enc_idx);
// std::exit(EXIT_FAILURE);
// // If that fails (e.g., stuck destructors), force kill
// LOG_ERROR1(VDUHardware::logger, "Encoder %u: Graceful exit failed. Forcing process termination.", enc_idx);
// std::raise(SIGKILL);
// }
// gst_deinit();
return success;
}
bool StreamerTest::ToggleRtpServer() {
if (!rtpServerRunning) {
std::string launchString = "videotestsrc ! x264enc tune=zerolatency ! rtph264pay config-interval=1 pt=96 ! udpsink host=127.0.0.1 port=5000";
std::cout << "Trying to start RTP server........." << std::endl;
if (InitializeRtpServer(launchString)) {
rtpServerRunning = true;
return true;
}
} else {
std::cout << "Trying to stop RTP server........." << std::endl;
StopRtpServer();
rtpServerRunning = false;
return true;
std::cout << "RTP server stopped........." << std::endl;
}
std::cout << "ToggleRtpServer returning........." << std::endl;
return false;
}
And there the corresponding StreamerTest.h:
#ifndef STREAMERTEST
#include <stdlib.h>
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <gst/gstinfo.h>
#pragma GCC diagnostic pop
class StreamerTest {
public:
StreamerTest();
~StreamerTest();
bool ToggleRtpServer();
private:
std::string usedRtpLaunchString;
bool rtpServerRunning = false;
GstElement* rtpPipeline;
std::thread rtpThread;
std::mutex rtpMutex;
GMainLoop* rtpLoop;
void RunRtpLoop();
bool InitializeRtpServer(std::string launchString);
bool PauseRtpServer();
};
#endif
And the main.cpp:
#include "StreamerTest.h"
int main() {
StreamerTest streamer;
int toggleCount = 0;
// while (streamer.ToggleRTSPServer()) {
while (streamer.ToggleRtpServer()) {
++toggleCount;
std::cout << "\n--------------------- Successful Toggle Count: " << toggleCount << " ---------------------\n\n" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
return 0;
}
As you can see, I have tried many different things to stop gst_element_set_state() from hanging, but to no avail.
Again, I am cross-compiling with gstreamer version 1.16.3 and glib version 2.64.5.