Hello everyone!
I have a pipeline with an appsrc that is taking images encodes them and then streaming with an RTSP server. I am attaching 2 scripts. One with a toy example that gets the images in a fixed interval and the second that takes them from another callback. I am using Gstreamer 1.16
In both of them I face the same problem. The RTSP client connects ( for example VLC or RTSPtoWebRTC server) and everything runs smoothly up until the time that the server gets in a very bad state and hangs. This might happen during the client disconnection reconnection or something else. I notice the hang behavior more on the server class I attached rather than the toy example script. Does any of you have any idea how to debug or possible causes?
Thank you in advance!
Toy example code:
import time
import numpy as np
import cv2
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GLib, GstRtspServer
Gst.init(None)
global_appsrc = None
frame_counter = 0
timestamp = 0
frame_duration = Gst.SECOND // 30
def media_configure_callback(factory, media):
global global_appsrc
element = media.get_element()
global_appsrc = element.get_by_name("mysrc")
if global_appsrc:
print("Appsrc element obtained from media pipeline")
else:
print("Error: Could not retrieve appsrc element from media pipeline")
def push_frame():
global global_appsrc, frame_counter, timestamp
if global_appsrc is None:
print(f"Appsrc not ready. Skipping frame {frame_counter}")
return True
width, height = 1920, 1080
# Create test frame
frame_bgr = np.full((height, width, 3), (125, 255, 0), dtype=np.uint8)
center_x = (frame_counter * 5) % width
center_y = height // 2
cv2.circle(frame_bgr, (center_x, center_y), 30, (0, 0, 255), -1)
frame_counter += 1
frame_i420 = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2YUV_I420)
data = frame_i420.tobytes()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
timestamp += frame_duration
buf.pts = timestamp
buf.duration = frame_duration
retval = global_appsrc.emit("push-buffer", buf)
if retval != Gst.FlowReturn.OK:
print("Error pushing buffer:", retval)
return True
def on_client_connected(server, client):
print("New client connected:", client)
def main():
server = GstRtspServer.RTSPServer()
server.connect("client-connected", on_client_connected)
mounts = server.get_mount_points()
factory = GstRtspServer.RTSPMediaFactory()
factory.set_launch(
"( appsrc name=mysrc is-live=true block=true do-timestamp=true format=time "
"caps=video/x-raw,format=I420,width=1920,height=1080,framerate=30/1 ! "
"queue leaky=downstream max-size-buffers=10 ! "
"nvvidconv ! nvv4l2h264enc insert-sps-pps=true idrinterval=15 ! "
"h264parse ! rtph264pay name=pay0 pt=96 )"
)
factory.set_shared(True)
factory.connect("media-configure", media_configure_callback)
mounts.add_factory("/stream", factory)
server.props.service = "8554"
server.attach(None)
print("RTSP Server is live at rtsp://127.0.0.1:8554/stream")
GLib.timeout_add(33, push_frame)
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == '__main__':
main()
RTSP out for real app
import time
import cv2
import numpy as np
import threading
from pathlib import Path
import gi
from hydra_interface.hydra_ros_logger import get_ros_hydra_logger
import __main__
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GLib, GstRtspServer
Gst.init(None)
logger = get_ros_hydra_logger(Path(__main__.__file__).name)
class RTSPStreamer:
def __init__(self, width=640, height=480, framerate=30):
self.width = width
self.height = height
self.framerate = framerate
self.appsrc = None
self.timestamp = 0
self.frame_duration_ns = Gst.SECOND // self.framerate
self.last_frame = self._generate_black_frame()
self._build_rtsp_server()
self.running = True
threading.Thread(target=self._frame_loop, daemon=True).start()
def _generate_black_frame(self):
return np.zeros((self.height, self.width, 3), dtype=np.uint8)
def _build_rtsp_server(self):
self.server = GstRtspServer.RTSPServer()
self.server.set_address("0.0.0.0")
mounts = self.server.get_mount_points()
launch_str = (
"( appsrc name=mysrc is-live=true block=true format=time "
"caps=video/x-raw,format=I420,width={width},height={height},framerate={fps}/1 ! "
"queue leaky=downstream max-size-buffers=10 ! "
"nvvidconv ! nvv4l2h264enc insert-sps-pps=true idrinterval=15 ! "
"h264parse ! rtph264pay name=pay0 pt=96 )"
).format(width=self.width, height=self.height, fps=self.framerate)
self.factory = GstRtspServer.RTSPMediaFactory()
self.factory.set_launch(launch_str)
self.factory.set_shared(True)
self.factory.connect("media-configure", self.media_configure_callback)
mounts.add_factory("/stream", self.factory)
self.server.attach(None)
logger.info("RTSP Server is live at rtsp://<device_ip>:8554/stream")
threading.Thread(target=self._start_mainloop, daemon=True).start()
def _start_mainloop(self):
loop = GLib.MainLoop()
loop.run()
def media_configure_callback(self, factory, media):
element = media.get_element()
self.appsrc = element.get_child_by_name("mysrc")
if self.appsrc:
media.connect("unprepared", self._on_media_unprepared)
logger.info("appsrc successfully obtained from the RTSP pipeline")
else:
logger.warning("Error: Unable to retrieve appsrc from RTSP pipeline")
def _on_media_unprepared(self, media):
logger.warning("Client disconnected. Clearing appsrc.")
self.appsrc = None
def _frame_loop(self):
while self.running:
try:
self.push_frame(self.last_frame)
except Exception as e:
logger.error(f"Frame loop error: {e}")
time.sleep(1 / self.framerate)
def push_frame(self, image):
if image is None:
logger.warning("push_frame: got None image. Skipping.")
return
self.last_frame = image # Keeo a copy of the last image?
if self.appsrc is None:
logger.debug("push_frame: appsrc is None. No client connected.")
return
state = self.appsrc.get_state(0).state
if state == Gst.State.NULL:
logger.debug("push_frame: appsrc is in NULL state.")
return
if image.shape[1] != self.width or image.shape[0] != self.height:
image = cv2.resize(image, (self.width, self.height))
frame_i420 = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420)
data = frame_i420.tobytes()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
now = time.time_ns()
if now > self.timestamp:
self.timestamp = now
buf.pts = self.timestamp
buf.duration = self.frame_duration_ns
retval = self.appsrc.emit("push-buffer", buf)
if retval == Gst.FlowReturn.FLUSHING:
logger.warning("Pipeline flushing — client disconnected.")
self.appsrc = None
elif retval != Gst.FlowReturn.OK:
logger.error(f"Unexpected error pushing frame: {retval}")