import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import time
import threading
Gst.init(None)
class GStreamerBinController:
def __init__(self):
self.pipeline = None
self.bin = None
self.loop = GLib.MainLoop()
self.running = False
self.init_pipeline()
def init_pipeline(self):
"""Initialize the main pipeline (created once)"""
self.pipeline = Gst.Pipeline()
# Start the pipeline
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to start pipeline")
else:
print("Pipeline initialized and started")
def create_bin(self):
"""Create a new GStreamer bin with RTSP source"""
bin = Gst.Bin.new("rtsp_bin")
# Create elements for the bin
# RTSP source - you can set the location property
src = Gst.ElementFactory.make("rtspsrc", "src")
src.set_property("location", "rtsp://rtsp_ip_example/stream1") # Set your RTSP URL
src.set_property("latency", 0)
src.set_property("drop-on-latency", True)
# RTP H265 depayloader
rtph265depay = Gst.ElementFactory.make("rtph265depay", "rtph265depay")
# H265 parser
h265parse = Gst.ElementFactory.make("h265parse", "h265parse")
# H265 decoder
avdec_h265 = Gst.ElementFactory.make("avdec_h265", "avdec_h265")
# Video sink
sink = Gst.ElementFactory.make("autovideosink", "sink")
# Add elements to bin
bin.add(src)
bin.add(rtph265depay)
bin.add(h265parse)
bin.add(avdec_h265)
bin.add(sink)
# Link elements within bin
# rtspsrc -> rtph265depay -> h265parse -> avdec_h265 -> sink
rtph265depay.link(h265parse)
h265parse.link(avdec_h265)
avdec_h265.link(sink)
# Connect rtspsrc pad-added signal to link to rtph265depay
def on_pad_added(src, pad):
print(f"Pad added: {pad.get_name()}")
# Get sink pad of rtph265depay
sink_pad = rtph265depay.get_static_pad("sink")
if sink_pad is None:
print("Error: rtph265depay sink pad not found")
return
# Query pad caps
caps = pad.query_caps()
if caps:
print(f"Pad caps: {caps.to_string()}")
# Check if pad is already linked
if pad.is_linked():
print("Pad is already linked")
return
# Try to link the pad
# rtspsrc typically provides application/x-rtp pads
ret = pad.link(sink_pad)
if ret == Gst.PadLinkReturn.OK:
print("Successfully linked rtspsrc pad to rtph265depay")
elif ret == Gst.PadLinkReturn.NOFORMAT:
print("No format yet, waiting for caps...")
# Wait for caps and try again
def try_link_after_caps(pad, info):
if pad.is_linked():
return Gst.PadProbeReturn.REMOVE
caps = pad.get_current_caps()
if caps and not caps.is_empty():
ret = pad.link(sink_pad)
if ret == Gst.PadLinkReturn.OK:
print("Successfully linked pad after caps were set")
return Gst.PadProbeReturn.REMOVE
return Gst.PadProbeReturn.OK
pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM, try_link_after_caps)
else:
print(f"Failed to link pad: {ret}")
src.connect("pad-added", on_pad_added)
# No ghost pad needed - bin is self-contained
return bin
def add_bin_to_pipeline(self):
"""Add bin to the pipeline"""
if self.bin is not None:
print("Bin already exists, removing first...")
self.remove_bin_from_pipeline()
self.bin = self.create_bin()
self.pipeline.add(self.bin)
# Sync bin state with pipeline
ret = self.bin.sync_state_with_parent()
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to add bin to pipeline")
return False
print("Bin added to pipeline")
return True
def remove_bin_from_pipeline(self):
"""Remove bin from the pipeline"""
if self.bin is None:
return False
# Set bin state to NULL before removing
self.bin.set_state(Gst.State.NULL)
# Remove bin from pipeline
self.pipeline.remove(self.bin)
self.bin = None
print("Bin removed from pipeline")
return True
def run_cycle(self):
"""Run one cycle: add bin, wait, then remove"""
if self.add_bin_to_pipeline():
# Let it run for a short time (e.g., 2 seconds)
time.sleep(2)
self.remove_bin_from_pipeline()
else:
print("Failed to add bin in this cycle")
def start_loop(self):
"""Start the main loop that adds/removes bin every 5 seconds"""
self.running = True
def cycle_thread():
while self.running:
self.run_cycle()
if self.running:
print("Waiting 5 seconds before next cycle...")
time.sleep(10)
thread = threading.Thread(target=cycle_thread, daemon=True)
thread.start()
try:
self.loop.run()
except KeyboardInterrupt:
print("\nStopping...")
self.running = False
self.remove_bin_from_pipeline()
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
if __name__ == "__main__":
controller = GStreamerBinController()
controller.start_loop()
sorry for inconvenience
image left: right after start
image middle: about a hour pass
image right: about 17 hours pass
working on:
- gstreamer 1.26.2
- raspberry pi 5
Hello I’m quiet new to gstreamer.
I’m trying to make dynamically add and remove bin while runtime. But it seems like available memory usage is getting lower and lower.
Like above image, did not change dramatically but there is a decrease when I keep adding and removing bin
Is this memory leak? or did I do something wrong and losing memory?
Also if this is memory leak problem there any report on memory leaks of this content to follow on? I tried to find related article but couldn’t find one.
thank you.
