Rtsp server hangs

Hello everyone!

I have a pipeline with an appsrc that is taking images encodes them and then streaming with an RTSP server. I am attaching 2 scripts. One with a toy example that gets the images in a fixed interval and the second that takes them from another callback. I am using Gstreamer 1.16

In both of them I face the same problem. The RTSP client connects ( for example VLC or RTSPtoWebRTC server) and everything runs smoothly up until the time that the server gets in a very bad state and hangs. This might happen during the client disconnection reconnection or something else. I notice the hang behavior more on the server class I attached rather than the toy example script. Does any of you have any idea how to debug or possible causes?

Thank you in advance!

Toy example code:

import time
import numpy as np
import cv2
import gi

gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GLib, GstRtspServer

Gst.init(None)

global_appsrc = None
frame_counter = 0
timestamp = 0  
frame_duration = Gst.SECOND // 30  

def media_configure_callback(factory, media):
    global global_appsrc
    element = media.get_element()
    global_appsrc = element.get_by_name("mysrc")

    if global_appsrc:
        print("Appsrc element obtained from media pipeline")
    else:
        print("Error: Could not retrieve appsrc element from media pipeline")

def push_frame():
    global global_appsrc, frame_counter, timestamp
    if global_appsrc is None:
        print(f"Appsrc not ready. Skipping frame {frame_counter}")
        return True

    width, height = 1920, 1080

    # Create test frame
    frame_bgr = np.full((height, width, 3), (125, 255, 0), dtype=np.uint8)
    center_x = (frame_counter * 5) % width
    center_y = height // 2
    cv2.circle(frame_bgr, (center_x, center_y), 30, (0, 0, 255), -1)
    frame_counter += 1

    frame_i420 = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2YUV_I420)
    data = frame_i420.tobytes()

    buf = Gst.Buffer.new_allocate(None, len(data), None)
    buf.fill(0, data)

    timestamp += frame_duration
    buf.pts = timestamp
    buf.duration = frame_duration

    retval = global_appsrc.emit("push-buffer", buf)
    if retval != Gst.FlowReturn.OK:
        print("Error pushing buffer:", retval)

    return True  

def on_client_connected(server, client):
    print("New client connected:", client)

def main():
    server = GstRtspServer.RTSPServer()
    server.connect("client-connected", on_client_connected)
    mounts = server.get_mount_points()

    factory = GstRtspServer.RTSPMediaFactory()
    factory.set_launch(
        "( appsrc name=mysrc is-live=true block=true do-timestamp=true format=time "
        "caps=video/x-raw,format=I420,width=1920,height=1080,framerate=30/1 ! "
        "queue leaky=downstream max-size-buffers=10 ! "  
        "nvvidconv ! nvv4l2h264enc insert-sps-pps=true idrinterval=15 ! "
        "h264parse ! rtph264pay name=pay0 pt=96 )"
    )

    factory.set_shared(True)
    factory.connect("media-configure", media_configure_callback)
    mounts.add_factory("/stream", factory)
    server.props.service = "8554"
    server.attach(None)

    print("RTSP Server is live at rtsp://127.0.0.1:8554/stream")

    GLib.timeout_add(33, push_frame)  
    loop = GLib.MainLoop()
    try:
        loop.run()
    except KeyboardInterrupt:
        print("Shutting down...")

if __name__ == '__main__':
    main()

RTSP out for real app

import time
import cv2
import numpy as np
import threading
from pathlib import Path
import gi
from hydra_interface.hydra_ros_logger import get_ros_hydra_logger
import __main__

gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GLib, GstRtspServer

Gst.init(None)
logger = get_ros_hydra_logger(Path(__main__.__file__).name)

class RTSPStreamer:
    def __init__(self, width=640, height=480, framerate=30):
        self.width = width
        self.height = height
        self.framerate = framerate
        self.appsrc = None
        self.timestamp = 0
        self.frame_duration_ns = Gst.SECOND // self.framerate

        self.last_frame = self._generate_black_frame()

        self._build_rtsp_server()

        self.running = True
        threading.Thread(target=self._frame_loop, daemon=True).start()

    def _generate_black_frame(self):
        return np.zeros((self.height, self.width, 3), dtype=np.uint8)

    def _build_rtsp_server(self):
        self.server = GstRtspServer.RTSPServer()
        self.server.set_address("0.0.0.0")
        mounts = self.server.get_mount_points()

        launch_str = (
            "( appsrc name=mysrc is-live=true block=true format=time "
            "caps=video/x-raw,format=I420,width={width},height={height},framerate={fps}/1 ! "
            "queue leaky=downstream max-size-buffers=10 ! "
            "nvvidconv ! nvv4l2h264enc insert-sps-pps=true idrinterval=15 ! "
            "h264parse ! rtph264pay name=pay0 pt=96 )"
        ).format(width=self.width, height=self.height, fps=self.framerate)

        self.factory = GstRtspServer.RTSPMediaFactory()
        self.factory.set_launch(launch_str)
        self.factory.set_shared(True)
        self.factory.connect("media-configure", self.media_configure_callback)
        mounts.add_factory("/stream", self.factory)

        self.server.attach(None)
        logger.info("RTSP Server is live at rtsp://<device_ip>:8554/stream")

        threading.Thread(target=self._start_mainloop, daemon=True).start()

    def _start_mainloop(self):
        loop = GLib.MainLoop()
        loop.run()

    def media_configure_callback(self, factory, media):
        element = media.get_element()
        self.appsrc = element.get_child_by_name("mysrc")
        if self.appsrc:
            media.connect("unprepared", self._on_media_unprepared)
            logger.info("appsrc successfully obtained from the RTSP pipeline")
        else:
            logger.warning("Error: Unable to retrieve appsrc from RTSP pipeline")

    def _on_media_unprepared(self, media):
        logger.warning("Client disconnected. Clearing appsrc.")
        self.appsrc = None

    def _frame_loop(self):
        while self.running:
            try:
                self.push_frame(self.last_frame)
            except Exception as e:
                logger.error(f"Frame loop error: {e}")
            time.sleep(1 / self.framerate)

    def push_frame(self, image):
        if image is None:
            logger.warning("push_frame: got None image. Skipping.")
            return

        self.last_frame = image  # Keeo a copy of the last image?

        if self.appsrc is None:
            logger.debug("push_frame: appsrc is None. No client connected.")
            return

        state = self.appsrc.get_state(0).state
        if state == Gst.State.NULL:
            logger.debug("push_frame: appsrc is in NULL state.")
            return

        if image.shape[1] != self.width or image.shape[0] != self.height:
            image = cv2.resize(image, (self.width, self.height))

        frame_i420 = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420)
        data = frame_i420.tobytes()

        buf = Gst.Buffer.new_allocate(None, len(data), None)
        buf.fill(0, data)

        now = time.time_ns()
        if now > self.timestamp:
            self.timestamp = now
        buf.pts = self.timestamp
        buf.duration = self.frame_duration_ns

        retval = self.appsrc.emit("push-buffer", buf)
        if retval == Gst.FlowReturn.FLUSHING:
            logger.warning("Pipeline flushing — client disconnected.")
            self.appsrc = None
        elif retval != Gst.FlowReturn.OK:
            logger.error(f"Unexpected error pushing frame: {retval}")