Hi, I am new to gstreamer and currently am trying to implement an RTSP server pipeline that I can initialize and stop/restart using the gstreamer API in C++. Here is an MWE of my current implementation:
#include <StreamerTest.h>
static gboolean RTSPServerTimeout(GstRTSPServer *RTSPServer) {
if (RTSPServer && GST_IS_RTSP_SERVER(RTSPServer)) {
GstRTSPSessionPool *pool;
pool = gst_rtsp_server_get_session_pool(RTSPServer);
gst_rtsp_session_pool_cleanup(pool);
g_clear_object(&pool);
}
return true;
}
GstRTSPFilterResult clientFilterFunc(GstRTSPServer *server, GstRTSPClient *client, gpointer user_data) {
return GST_RTSP_FILTER_REMOVE;
}
StreamerTest::StreamerTest() {
std::cout << "--------------------- Initializing StreamerTest ---------------------\n\n" << std::endl;
gst_init(nullptr, nullptr);
std::cout << "GSreamer initialized.\n" << std::endl;
}
StreamerTest::~StreamerTest() {
std::cout << "--------------------- Exiting StreamerTest ---------------------\n\n" << std::endl;
}
void StreamerTest::ClientConnected(GstRTSPServer* server, GstRTSPClient* client, gpointer user_data) {
auto* clientCount = reinterpret_cast<std::atomic<uint32_t>*>(user_data);
clientCount->fetch_add(1);
g_signal_connect(client, "closed", reinterpret_cast<GCallback>(StreamerTest::ClientClosed), clientCount);
}
void StreamerTest::ClientClosed(GstRTSPClient* client, gpointer user_data) {
auto* clientCount = reinterpret_cast<std::atomic<uint32_t>*>(user_data);
clientCount->fetch_sub(1);
}
void StreamerTest::RunRTSPLoop() {
g_main_loop_run(RTSPLoop);
std::cout << "Running RTSP main loop." << std::endl;
}
static void on_rtsp_server_finalized(gpointer data, GObject* where_the_object_was) {
std::cout << "RTSPServer finalized. Pointer was: " << where_the_object_was << std::endl;
}
static void on_rtsp_factory_finalized(gpointer data, GObject* where_the_object_was) {
std::cout << "RTSPFactory finalized. Pointer was: " << where_the_object_was << std::endl;
}
bool StreamerTest::InitializeRTSPServer(const char* ip, const char* port, const char* path, std::string launchString) {
RTSPClientCount = 0;
{
std::lock_guard<std::mutex> lock(RTSPMutex);
RTSPLoop = g_main_loop_new(nullptr, true);
std::cout << "RTSP main loop created." << std::endl;
RTSPServer = gst_rtsp_server_new();
gst_rtsp_server_set_address(RTSPServer, ip);
gst_rtsp_server_set_service(RTSPServer, port);
std::cout << "RTSP server created with address: " << ip << ":" << port << std::endl;
g_object_weak_ref(G_OBJECT(RTSPServer), on_rtsp_server_finalized, nullptr);
RTSPFactory = gst_rtsp_media_factory_new();
gst_rtsp_media_factory_set_launch(RTSPFactory, launchString.c_str());
gst_rtsp_media_factory_set_shared(RTSPFactory, true);
std::cout << "RTSP media factory created and set to be shared by all clients. Launch string set to:\n\n" << launchString << "\n" << std::endl;
g_object_weak_ref(G_OBJECT(RTSPFactory), on_rtsp_factory_finalized, nullptr);
RTSPMount = gst_rtsp_server_get_mount_points(RTSPServer);
gst_rtsp_mount_points_add_factory(RTSPMount, path, RTSPFactory);
g_clear_object(&RTSPMount);
// RTSPMount = nullptr;
std::cout << "RTSP server mount point obtained and media factory attached to path: " << path << std::endl;
RTSPSourceID = gst_rtsp_server_attach(RTSPServer, nullptr);
g_signal_connect(RTSPServer, "client-connected", reinterpret_cast<GCallback>(StreamerTest::ClientConnected), reinterpret_cast<gpointer>(&RTSPClientCount));
std::cout << "RTSP server attached to main context." << std::endl;
g_timeout_add_seconds(2, (GSourceFunc)RTSPServerTimeout, RTSPServer);
std::cout << "RTSP timeout function added for session cleanup every 2 seconds." << std::endl;
}
RTSPThread = std::thread(&StreamerTest::RunRTSPLoop, this);
std::cout << "RTSP server ready at rtsp://" << ip << ":" << port << path << std::endl;
return true;
}
bool StreamerTest::StopRTSPServer() {
bool success = true;
{
std::lock_guard<std::mutex> lock(RTSPMutex);
if (RTSPLoop && g_main_loop_is_running(RTSPLoop)) {
g_main_loop_quit(RTSPLoop);
std::cout << "RTSP main loop stopped." << std::endl;
} else {
std::cout << "Failed to stop RTSP main loop." << std::endl;
success = false;
}
std::cout << "RTSPserver: " << RTSPServer << std::endl;
std::cout << "RTSPFactory: " << RTSPFactory << std::endl;
std::cout << "RTSPLoop: " << RTSPLoop << std::endl;
std::cout << "RTSPSourceID: " << RTSPSourceID << std::endl;
if (RTSPServer && GST_IS_RTSP_SERVER(RTSPServer)) {
if (0 < RTSPClientCount.load()) {
gst_rtsp_server_client_filter(RTSPServer, clientFilterFunc, nullptr);
std::cout << "All RTSP clients disconnected. Client count: " << RTSPClientCount << std::endl;
}
GstRTSPSessionPool *pool = gst_rtsp_server_get_session_pool(RTSPServer);
if (pool && GST_IS_RTSP_SESSION_POOL(pool)) {
gst_rtsp_session_pool_cleanup(pool);
g_clear_object(&pool);
std::cout << "RTSP session pool cleaned." << std::endl;
} else {
std::cout << "Failed to clean up RTSP session." << std::endl;
success = false;
}
}
std::cout << "RTSPserver: " << RTSPServer << std::endl;
std::cout << "RTSPFactory: " << RTSPFactory << std::endl;
std::cout << "RTSPLoop: " << RTSPLoop << std::endl;
std::cout << "RTSPSourceID: " << RTSPSourceID << std::endl;
if (RTSPSourceID) {
g_source_remove(RTSPSourceID);
RTSPSourceID = 0;
std::cout << "RTSP source ID removed." << std::endl;
} else {
std::cout << "Failed to remove RTSP source ID." << std::endl;
success = false;
}
std::cout << "RTSPserver: " << RTSPServer << std::endl;
std::cout << "RTSPFactory: " << RTSPFactory << std::endl;
std::cout << "RTSPLoop: " << RTSPLoop << std::endl;
std::cout << "RTSPSourceID: " << RTSPSourceID << std::endl;
if (RTSPFactory && G_IS_OBJECT(RTSPFactory)) {
g_clear_object(&RTSPFactory);
// RTSPFactory = nullptr;
std::cout << "RTSP media factory unreferenced." << std::endl;
// } else if (g_object_is_floating(RTSPFactory)) {
// g_object_ref_sink(RTSPFactory);
// g_clear_object(&RTSPFactory);
// std::cout << "RTSP media factory was floating." << std::endl;
} else {
std::cout << "Failed to unreference RTSP media factory." << std::endl;
success = false;
}
std::cout << "RTSPserver: " << RTSPServer << std::endl;
std::cout << "RTSPFactory: " << RTSPFactory << std::endl;
std::cout << "RTSPLoop: " << RTSPLoop << std::endl;
std::cout << "RTSPSourceID: " << RTSPSourceID << std::endl;
if (RTSPServer && G_IS_OBJECT(RTSPServer) && GST_IS_RTSP_SERVER(RTSPServer)) {
g_clear_object(&RTSPServer);
// RTSPServer = nullptr;
std::cout << "RTSP server unreferenced." << std::endl;
} else {
std::cout << "Failed to unreference RTSP server. It may have already been implicitly unreferenced by previous calls." << std::endl;
success = false;
}
std::cout << "RTSPserver: " << RTSPServer << std::endl;
std::cout << "RTSPFactory: " << RTSPFactory << std::endl;
std::cout << "RTSPLoop: " << RTSPLoop << std::endl;
std::cout << "RTSPSourceID: " << RTSPSourceID << std::endl;
if (RTSPLoop) {
g_main_loop_unref(RTSPLoop);
RTSPLoop = nullptr;
std::cout << "RTSP main loop unreferenced." << std::endl;
} else {
std::cout << "Failed to unreference RTSP main loop." << std::endl;
success = false;
}
}
if (RTSPThread.joinable()) {
RTSPThread.join();
std::cout << "RTSP thread joined." << std::endl;
} else {
std::cout << "Failed to join RTSP thread." << std::endl;
success = false;
}
// gst_deinit();
return success;
}
bool StreamerTest::ToggleRTSPServer() {
if (!RTSPServerRunning) {
static char ip[16] = "127.0.0.1";
static char port[10] = "8554";
static char path[10] = "/test";
std::string launchString = "( videotestsrc is-live=1 ! x264enc tune=zerolatency ! rtph264pay name=pay0 pt=96 )";
std::cout << "Trying to start RTSP server........." << std::endl;
if (InitializeRTSPServer(ip, port, path, launchString)) {
RTSPServerRunning = true;
return true;
}
} else {
std::cout << "Trying to stop RTSP server........." << std::endl;
StopRTSPServer();
RTSPServerRunning = false;
return true;
std::cout << "RTSP server stopped........." << std::endl;
}
std::cout << "ToggleRTSPServer returning........." << std::endl;
return false;
}
And here is the corresponding StreamerTest.h:
#ifndef STREAMERTEST
#include <stdlib.h>
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <gst/gstinfo.h>
#pragma GCC diagnostic pop
class StreamerTest {
public:
StreamerTest();s
~StreamerTest();
bool ToggleRTSPServer();
private:
bool RTSPServerRunning = false;
std::mutex RTSPMutex;
std::thread RTSPThread;
GMainLoop* RTSPLoop;
GstRTSPServer* RTSPServer;
guint RTSPSourceID;
GstRTSPMountPoints* RTSPMount;
GstRTSPMediaFactory* RTSPFactory;
std::atomic<uint32_t> RTSPClientCount;
static void ClientClosed(GstRTSPClient* client, gpointer user);
static void ClientConnected(GstRTSPServer* server, GstRTSPClient* client, gpointer user);
void RunRTSPLoop();
bool InitializeRTSPServer(const char* ip, const char* port, const char* path, std::string launchString);
bool StopRTSPServer();
};
#endif
And the main.cpp:
#include "StreamerTest.h"
int main() {
StreamerTest streamer;
int toggleCount = 0;
while (streamer.ToggleRTSPServer()) {
++toggleCount;
std::cout << "\n--------------------- Successful Toggle Count: " << toggleCount << " ---------------------\n\n" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
return 0;
}
In general, everything seems to work fine, except that when calling StopRTSPServer(), I get:
GLib-GObject-CRITICAL **: 12:53:20.742: g_object_unref: assertion 'G_IS_OBJECT (object)' failed
This error is somehow linked to the fact that I am removing RTSPSourceID and clearing both the RTSPServer and RTSPFactory. Note that I added the removal of the RTSPSourceID because without it, relaunching the RTSP server would fail in my original, much more complex application that this MWE is based on. Specifically, it would fail when trying to attach the RTSP server to the main context (i.e. when calling gst_rtsp_server_attach()). For some reason, in this MWE, restarting the server works fine without removal of the RTSPSourceID, even though the methods are exactly the same. I have no idea why.
I any case, as you can see, I have added two helper functions (on_rtsp_server_finalized and on_rtsp_factory_finalized) to see when the objects are finalized / their reference counter is set to zero. Interestingly, when not removing the RTSPSourceID, and clearing the RTSPServer before the RTSPFactory, both the RTSPServer and the RTSPFactory are finalized when calling g_clear_object(&RTSPServer). So apparently, the RTSPFactory is also implicitly unreferenced by this call, although not set to NULL. This is somewhat unexpected since this behavior is unfortunately also not stated in the documentation of the function call. I guess it makes sense however, although I would love to check the function’s source code to make sure that the RTSPFactory is actually properly unreferenced by clearing the RTSPServer.
Notably, it does not work the other way around; clearing the RTSPFactory does not unreference the RTSPServer for whichever reason, so when clearing the RTSPServer after the RTSPFactory, I still get the above-mentioned error, as clearing the RTSPServer still tries to unrefenrence the RTSPFactory. This would all be fine though if I knew that both objects have been properly cleared at this point.
Even more notably however is the fact that in the above-mentioned scenario (no RTSPSourceID and clearing the RTSPServer before the RTSPFactory), neither the RTSPServer nor the RTSPFactory are actually finalized when shutting down the server and clearing the RTSPServer for the first time. Any following server shut downs and restarts will however finalize both the RTSPServer and the RTSPFactory when clearing the RTSPServer. I suspect that this is related to the fact that I need to remove the RTSPSourceID in my original application, since doing that before clearing the RTSPServer will actually cause both the RTSPServer and the RTSPFactory to also be finalized already during the first server shut down. So I guess that I why I need to include it in my original project, but I don’t know why it has this effect and what else it does.
In any case, this is certainly unexpected behavior and I wonder if I am handling this properly.
I am cross-compiling with gstreamer version 1.16.3 and glib version 2.64.5