Is there proper ways to remove bin dynamically in gst python?

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import time
import threading

Gst.init(None)

class GStreamerBinController:
    def __init__(self):
        self.pipeline = None
        self.bin = None
        self.loop = GLib.MainLoop()
        self.running = False
        self.init_pipeline()
        
    def init_pipeline(self):
        """Initialize the main pipeline (created once)"""
        self.pipeline = Gst.Pipeline()
        
        
        # Start the pipeline
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Failed to start pipeline")
        else:
            print("Pipeline initialized and started")
    
    def create_bin(self):
        """Create a new GStreamer bin with RTSP source"""
        bin = Gst.Bin.new("rtsp_bin")
        
        # Create elements for the bin
        # RTSP source - you can set the location property
        src = Gst.ElementFactory.make("rtspsrc", "src")
        src.set_property("location", "rtsp://rtsp_ip_example/stream1")  # Set your RTSP URL
        src.set_property("latency", 0)  
        src.set_property("drop-on-latency", True)  
        
        # RTP H265 depayloader
        rtph265depay = Gst.ElementFactory.make("rtph265depay", "rtph265depay")
        
        # H265 parser
        h265parse = Gst.ElementFactory.make("h265parse", "h265parse")
        
        # H265 decoder
        avdec_h265 = Gst.ElementFactory.make("avdec_h265", "avdec_h265")
        
        # Video sink
        sink = Gst.ElementFactory.make("autovideosink", "sink")
        
        # Add elements to bin
        bin.add(src)
        bin.add(rtph265depay)
        bin.add(h265parse)
        bin.add(avdec_h265)
        bin.add(sink)
        
        # Link elements within bin
        # rtspsrc -> rtph265depay -> h265parse -> avdec_h265 -> sink
        rtph265depay.link(h265parse)
        h265parse.link(avdec_h265)
        avdec_h265.link(sink)
        
        # Connect rtspsrc pad-added signal to link to rtph265depay
        def on_pad_added(src, pad):
            print(f"Pad added: {pad.get_name()}")
            
            # Get sink pad of rtph265depay
            sink_pad = rtph265depay.get_static_pad("sink")
            if sink_pad is None:
                print("Error: rtph265depay sink pad not found")
                return
            
            # Query pad caps
            caps = pad.query_caps()
            if caps:
                print(f"Pad caps: {caps.to_string()}")
            
            # Check if pad is already linked
            if pad.is_linked():
                print("Pad is already linked")
                return
            
            # Try to link the pad
            # rtspsrc typically provides application/x-rtp pads
            ret = pad.link(sink_pad)
            if ret == Gst.PadLinkReturn.OK:
                print("Successfully linked rtspsrc pad to rtph265depay")
            elif ret == Gst.PadLinkReturn.NOFORMAT:
                print("No format yet, waiting for caps...")
                # Wait for caps and try again
                def try_link_after_caps(pad, info):
                    if pad.is_linked():
                        return Gst.PadProbeReturn.REMOVE
                    caps = pad.get_current_caps()
                    if caps and not caps.is_empty():
                        ret = pad.link(sink_pad)
                        if ret == Gst.PadLinkReturn.OK:
                            print("Successfully linked pad after caps were set")
                            return Gst.PadProbeReturn.REMOVE
                    return Gst.PadProbeReturn.OK
                
                pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM, try_link_after_caps)
            else:
                print(f"Failed to link pad: {ret}")
        
        src.connect("pad-added", on_pad_added)
        
        # No ghost pad needed - bin is self-contained
        return bin
    
    def add_bin_to_pipeline(self):
        """Add bin to the pipeline"""
        if self.bin is not None:
            print("Bin already exists, removing first...")
            self.remove_bin_from_pipeline()
        
        self.bin = self.create_bin()
        self.pipeline.add(self.bin)
        
        # Sync bin state with pipeline
        ret = self.bin.sync_state_with_parent()
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Failed to add bin to pipeline")
            return False
        
        print("Bin added to pipeline")
        return True
    
    def remove_bin_from_pipeline(self):
        """Remove bin from the pipeline"""
        if self.bin is None:
            return False
        
        # Set bin state to NULL before removing
        self.bin.set_state(Gst.State.NULL)
        
        # Remove bin from pipeline
        self.pipeline.remove(self.bin)
        self.bin = None
        
        print("Bin removed from pipeline")
        return True
    
    
    def run_cycle(self):
        """Run one cycle: add bin, wait, then remove"""
        if self.add_bin_to_pipeline():
            # Let it run for a short time (e.g., 2 seconds)
            time.sleep(2)
            self.remove_bin_from_pipeline()
        else:
            print("Failed to add bin in this cycle")
    
    def start_loop(self):
        """Start the main loop that adds/removes bin every 5 seconds"""
        self.running = True
        
        def cycle_thread():
            while self.running:
                self.run_cycle()
                if self.running:
                    print("Waiting 5 seconds before next cycle...")
                    time.sleep(10)
        
        thread = threading.Thread(target=cycle_thread, daemon=True)
        thread.start()
        
        try:
            self.loop.run()
        except KeyboardInterrupt:
            print("\nStopping...")
            self.running = False
            self.remove_bin_from_pipeline()
            if self.pipeline:
                self.pipeline.set_state(Gst.State.NULL)
            self.loop.quit()

if __name__ == "__main__":
    controller = GStreamerBinController()
    controller.start_loop()


sorry for inconvenience
image left: right after start

image middle: about a hour pass

image right: about 17 hours pass

working on:

  • gstreamer 1.26.2
  • raspberry pi 5

Hello I’m quiet new to gstreamer.

I’m trying to make dynamically add and remove bin while runtime. But it seems like available memory usage is getting lower and lower.

Like above image, did not change dramatically but there is a decrease when I keep adding and removing bin

Is this memory leak? or did I do something wrong and losing memory?

Also if this is memory leak problem there any report on memory leaks of this content to follow on? I tried to find related article but couldn’t find one.

thank you.

This blog is a good starting point -

thank you for replying!

I have read your link you given but seems like dynamic source removal is not included. I tried modifying and apply for dynamic rtspsrc removal, but can’t make it work with memory keep decreasing. Is there no way to remove source(rtspsrc) dynamically?

The general recipe to safely remove a source is:

source = pipeline.get_by_name("mysource")
pipeline.remove(source)
source.set_state(Gst.State.NULL)
del(source)

This should safely remove it.

1 Like

Thanks for this post! I didn’t know about pipeline.remove when I wrote my code a few years ago. I basically just set the state to Gst.State.NULL before I set self.pipeline=None. I’m not sure if NOT doing a remove would be causing any issues, but I will definitely look into adding this to better clean things up.

Thank you for answer.

I tried similar to your recipe by saving bin and elements in variable and remove by saved variable when deleting. Do I need to remove from pipeline by using get_by_name?