Sorry in advance if this the wrong place!
I have a gstreamer pipeline that currently works if I invoke Gst.parse_launch
:
rtspsrc tcp-timeout=<timeout> location=<location> is-live=true protocols=tcp name=mysrc "
! rtph264depay wait-for-keyframe=true request-keyframe=true "
! mpegtsmux name=mpegtsmux "
! multifilesink name=filesink next-file=max-duration max-file-duration=<duration> aggregate-gops=true post-messages=true location=<out_location>
I am trying to convert it to a dynamic pipeline like so:
def build_pipeline(self) -> str:
video_pipeline = Gst.Pipeline.new("video_pipeline")
all_data["video_pipeline"] = video_pipeline
rtsp_source = Gst.ElementFactory.make('rtspsrc', 'mysrc')
rtsp_source.set_property(...
...
all_data["mysrc"] = rtsp_source
rtph264_depay = Gst.ElementFactory.make('rtph264depay', 'rtp_depay')
rtph264_depay.set_property(....
...
all_data["rtp_depay"] = rtph264_depay
mpeg_ts_mux = Gst.ElementFactory.make('mpegtsmux', 'mpeg_mux')
all_data[mpeg_mux] = mpeg_ts_mux
multi_file_sink = Gst.ElementFactory.make('multifilesink', 'filesink')
multi_file_sink.set_property(...
...
all_data["filesink"] = multi_file_sink
video_pipeline.add(rtsp_source)
video_pipeline.add(rtph264_depay)
video_pipeline.add(mpeg_ts_mux)
video_pipeline.add(multi_file_sink)
if not rtph264_depay.link(mpeg_ts_mux):
print("Failed to link depay to mux")
else:
print("Linked depay to mux")
if not mpeg_ts_mux.link(multi_file_sink):
print("Failed to link mux to filesink")
else:
print("Linked mux to filesink")
rtsp_source.connect("pad-added", VideoStreamer._on_pad_added_callback, all_pipeline_data)
return video_pipeline
I define my pad-added callback like so:
@staticmethod
def _on_pad_added_callback(rtsp_source: Gst.Element, new_pad: Gst.Pad, *user_data) -> None:
def _check_if_video_pad(pad: Gst.Pad):
current_caps = pad.get_current_caps()
for cap_index in range(current_caps.get_size()):
current_structure = current_caps.get_structure(cap_index)
media_type = current_structure.get_string("media")
if media_type == "video":
return True
return False
if not new_pad.get_name().startswith("recv_rtp_src"):
logger.info(f"Ignoring pad with name {new_pad.get_name()}")
return
if new_pad.is_linked():
logger.info(f"Pad with name {new_pad.get_name()} is already linked")
return
# Right now I only care about grabbing video, in the future I want to differentiate video and audio pipelines
if not _check_if_video_pad(new_pad):
logger.info(f"Ignoring pad with name {new_pad.get_name()} as its not video")
return
rtp_depay_element: Gst.Element = user_data[0]["rtp_depay"]
depay_sink_pad: Gst.Pad = rtp_depay_element.get_static_pad("sink")
pad_link = new_pad.link(depay_sink_pad) # Returns <enum GST_PAD_LINK_OK of type Gst.PadLinkReturn>
Outside of this I do:
class VideoStreamer(ABC, threading.Thread):
def __init__(...):
...
self._lock: Final = threading.Lock()
self._loop: GLib.MainLoop | None = None
...
def run(self) -> None:
pipeline = self.build_pipeline()
bus.add_signal_watch()
bus.connect("message", self.handle_message)
with self._lock:
pipeline.set_state(Gst.State.PLAYING)
self._loop = GLib.MainLoop()
self._loop.run()
def handle_message(self, message: Gst.Message) -> None:
if message.src.get_name() != "filesink":
return
...
The problem is that when I use parse_launch
my code works fine. Messages from the file sink element make it handle_message
. With the new dynamic construction I handle messages for state changes and I can verify that the pipeline is started state changes from ready to paused to playing, however I never get any messages from the file sink. Am I missing a link or incorrectly linking the pads?