gst::Bin failed to add many elements when trying to switch video files

Hey everyone, I was trying to create a simple GTK4 application that uses the gtk FileChooserDialog to pick a file and play it in the gtk4paintablesink. When I initially load any video file, it perfectly loads with sound and the ability pause and play. However, if I try to switch the video file or even play the same file then the program crashes citing the following error:

thread ‘main’ panicked at src/gstmodule/gstmanager.rs:92:14:
called Result::unwrap() on an Err value: BoolError { message: “Failed to add elements”, filename: “/home/vish/.cargo/git/checkouts/gstreamer-rs-79e52a2d27eb91a3/0b4c602/gstreamer/src/bin.rs”, function: “gstreamer::bin::GstBinExtManual::add_many”, line: 58 }

Heres the entire source code its just two files:

my gstmanager.rs file

use anyhow::Error;
use derive_more::{Display, Error};
use gst::{element_error, element_warning};
use gst::{prelude::*, Element, Pipeline};
use gtk::{gdk, glib};

use std::sync::{Arc, Mutex};
#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {src}: {error} (debug: {debug:?})")]
struct ErrorMessage {
    src: glib::GString,
    error: glib::Error,
    debug: Option<glib::GString>,
}

#[derive(Clone, Debug, glib::Boxed)]
#[boxed_type(name = "ErrorValue")]
struct ErrorValue(Arc<Mutex<Option<Error>>>);
#[derive(glib::Downgrade)]
pub struct GstManager {
    gtksink: Element,
    pipeline: Pipeline,
}

impl GstManager {
    pub fn new() -> Self {
        Self {
            gtksink: gst::ElementFactory::make("gtk4paintablesink")
                .build()
                .unwrap(),
            pipeline: gst::Pipeline::new(),
        }
    }

    pub fn get_pipeline(&self) -> &Pipeline {
        &self.pipeline
    }

    pub fn get_paintable_sink(&self) -> gdk::Paintable {
        self.gtksink.property::<gdk::Paintable>("paintable")
    }

    pub fn set_video_filename(&self, filename: Option<&str>) {
        self.build_pipeline(filename);
    }

    pub fn set_play_stream(&self) {
        self.pipeline
            .set_state(gst::State::Playing)
            .expect("Unable to set the pipeline to the `Playing` state");
    }
    pub fn set_pause_stream(&self) {
        self.pipeline
            .set_state(gst::State::Paused)
            .expect("Unable to set the pipeline to the `Paused` state");
    }

    pub fn set_stop_stream(&self) {
        self.pipeline
            .set_state(gst::State::Null)
            .expect("Unable to set the pipeline to the `Null` state");
    }

    fn build_pipeline(&self, filename: Option<&str>) {
        dbg!("{}", filename);
        let filesrc = gst::ElementFactory::make("filesrc")
            .name("filesrc")
            .property("location", filename)
            .build()
            .unwrap();
        let decodebin = gst::ElementFactory::make("decodebin")
            .name("decodebin")
            .build()
            .unwrap();

        let binsink = gst::Bin::with_name("binsink");
        let tee = gst::ElementFactory::make("tee")
            .name("tee")
            .build()
            .unwrap();

        let queue0 = gst::ElementFactory::make("queue")
            .name("queue0")
            .build()
            .unwrap();
        let videoconvert = gst::ElementFactory::make("videoconvert")
            .name("videoconvert")
            .build()
            .unwrap();
        binsink
            .add_many([&tee, &queue0, &videoconvert, &self.gtksink])
            .unwrap();

        gst::Element::link_many([&tee, &queue0, &videoconvert, &self.gtksink]).unwrap();
        binsink
            .add_pad(&gst::GhostPad::with_target(&tee.static_pad("sink").unwrap()).unwrap())
            .unwrap();

        let sink = binsink.upcast();

        self.pipeline
            .add_many([&filesrc, &decodebin, &sink])
            .unwrap();
        gst::Element::link_many([&filesrc, &decodebin]).unwrap();

        let pipeline_weak = self.pipeline.downgrade();
        let sink_weak = sink.downgrade();

        decodebin.connect_pad_added(move |dbin, src_pad| {
            let (is_audio, is_video) = {
                let media_type = src_pad.current_caps().and_then(|caps| {
                    caps.structure(0).map(|s| {
                        let name = s.name();
                        (name.starts_with("audio/"), name.starts_with("video/"))
                    })
                });

                match media_type {
                    None => {
                        element_warning!(
                            dbin,
                            gst::CoreError::Negotiation,
                            ("Failed to get media type from pad {}", src_pad.name())
                        );

                        return;
                    }
                    Some(media_type) => media_type,
                }
            };
            println!("new pad {:?}", src_pad);
            let pipeline = match pipeline_weak.upgrade() {
                Some(pipeline) => pipeline,
                None => return,
            };
            let sink = match sink_weak.upgrade() {
                Some(sink) => sink,
                None => return,
            };
            // let audiosink = match audiosink_weak.upgrade() {
            //     Some(audiosink) => audiosink,
            //     None => return,
            // };
            let insert_sink = |is_audio, is_video| -> Result<(), Error> {
                if is_audio {
                    let queue = gst::ElementFactory::make("queue")
                        .name("audioqueue")
                        .build()
                        .unwrap();
                    let convert = gst::ElementFactory::make("audioconvert")
                        .name("audioconvert")
                        .build()
                        .unwrap();
                    let resample = gst::ElementFactory::make("audioresample")
                        .name("audioresample")
                        .build()
                        .unwrap();
                    let sink = gst::ElementFactory::make("autoaudiosink")
                        .name("autoaudiosink")
                        .build()
                        .unwrap();

                    let elements = &[&queue, &convert, &resample, &sink];
                    pipeline.add_many(elements).unwrap();
                    gst::Element::link_many(elements).unwrap();

                    // !!ATTENTION!!:
                    // This is quite important and people forget it often. Without making sure that
                    // the new elements have the same state as the pipeline, things will fail later.
                    // They would still be in Null state and can't process data.
                    for e in elements {
                        e.sync_state_with_parent().unwrap();
                    }

                    // Get the queue element's sink pad and link the decodebin's newly created
                    // src pad for the audio stream to it.
                    let sink_pad = queue.static_pad("sink").expect("queue has no sinkpad");
                    src_pad.link(&sink_pad).expect("Audio Link Failed");
                } else if is_video {
                    pipeline.remove(&sink).unwrap();
                    pipeline.add(&sink).unwrap();
                    sink.sync_state_with_parent().unwrap();

                    let sink_pad = sink.static_pad("sink").unwrap();
                    src_pad.link(&sink_pad).unwrap();
                }
                Ok(())
            };

            if let Err(err) = insert_sink(is_audio, is_video) {
                // The following sends a message of type Error on the bus, containing our detailed
                // error information.
                element_error!(
                    dbin,
                    gst::LibraryError::Failed,
                    ("Failed to insert sink"),
                    details: gst::Structure::builder("error-details")
                                .field("error",
                                       &ErrorValue(Arc::new(Mutex::new(Some(err)))))
                                .build()
                );
            }
        });
    }
}

My main.rs file

mod gstmodule;
use gst::prelude::*;

use gstmodule::gstmanager::GstManager;
use gtk::{gio, glib};
use gtk::{prelude::*, Button};
use std::cell::RefCell;

fn create_ui(app: &gtk::Application) {
    let window = gtk::ApplicationWindow::new(app);
    let gstman = GstManager::new();

    window.set_default_size(640, 480);
    window.set_title(Some("GstPlayer - Gtk4 + Gstreamer"));

    let paintable = gstman.get_paintable_sink();

    let vbox = gtk::Box::new(gtk::Orientation::Vertical, 10);
    let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 10);
    let hboxtop = gtk::Box::new(gtk::Orientation::Horizontal, 10);
    let picture = gtk::Picture::new();
    let label = gtk::Label::new(Some("Position: 00:00:00"));

    picture.set_paintable(Some(&paintable));

    let text_view = gtk::Label::new(Some("Please, select a mpegts video file."));
    let fchooser = Button::with_label("Open File");

    fchooser.connect_clicked(
        glib::clone!(@weak gstman,@weak text_view,@weak window => move |_| {
            println!("Done");
            //let videos_filter = gtk::FileFilter::new();
            // videos_filter.add_suffix(Some("*.mpegts"));
            //videos_filter.set_name(Some("MPEGTS"));

            let dialog = gtk::FileChooserDialog::builder()
                .title("Open File")
                .action(gtk::FileChooserAction::Open)
                .modal(true)
                // .filter(&videos_filter)
                .build();
            dialog.add_button("Cancel", gtk::ResponseType::Cancel);
            dialog.add_button("Accept", gtk::ResponseType::Accept);
            dialog.set_transient_for(Some(&window));
            dialog.run_async(glib::clone!(@weak gstman,@weak text_view => move |obj,res|{
                match res {
                    gtk::ResponseType::Accept => {
                        let file = obj.file().unwrap();
                        let from_str = gio::File::uri(&file).replace("file:///","/");
                        text_view.set_label(&from_str);
                        print!("{}",from_str);

                        gstman.set_video_filename(Some(&text_view.label().to_string()));
                    },
                    _ => {}
                }
                obj.destroy();
            }));
        }),
    );

    hboxtop.append(&fchooser);
    hboxtop.append(&text_view);
    hboxtop.set_halign(gtk::Align::Center);
    hboxtop.set_margin_top(20);

    vbox.append(&hboxtop);
    vbox.append(&picture);
    vbox.append(&label);
    vbox.append(&hbox);

    let pipeline_weak = gstman.get_pipeline().downgrade();
    let bus = gstman.get_pipeline().bus().unwrap();
    // let _pipeline = RefCell::new(Some(gstman.getPipeline()));

    let play_button = Button::with_label("Play");
    play_button.connect_clicked(glib::clone!(@weak gstman => move |_| {
        eprintln!("Play");
        gstman.set_play_stream();
    }));
    let pause_button = Button::with_label("Pause");
    pause_button.connect_clicked(glib::clone!(@weak gstman => move |_| {
        eprintln!("Pause");
        gstman.set_pause_stream();
    }));

    let stop_button = Button::with_label("Stop");
    stop_button.connect_clicked(move |_| {
        eprintln!("Stop");
        gstman.set_stop_stream();
    });

    hbox.append(&play_button);
    hbox.append(&pause_button);
    hbox.append(&stop_button);
    hbox.set_halign(gtk::Align::Center);
    hbox.set_margin_bottom(20);

    window.set_child(Some(&vbox));
    window.show();

    app.add_window(&window);

    let timeout_id = glib::timeout_add_local(std::time::Duration::from_millis(500), move || {
        let pipeline = match pipeline_weak.upgrade() {
            Some(pipeline) => pipeline,
            None => return glib::ControlFlow::Continue,
        };

        let position = pipeline.query_position::<gst::ClockTime>();
        label.set_text(&format!("Position: {:.0}", position.display()));
        glib::ControlFlow::Continue
    });

    let app_weak = app.downgrade();
    let bus_watch = bus
        .add_watch_local(move |_, msg| {
            use gst::MessageView;

            let app = match app_weak.upgrade() {
                Some(app) => app,
                None => return glib::ControlFlow::Break,
            };

            match msg.view() {
                MessageView::Eos(..) => app.quit(),
                MessageView::Error(err) => {
                    println!(
                        "Error from {:?}: {} ({:?})",
                        err.src().map(|s| s.path_string()),
                        err.error(),
                        err.debug()
                    );
                    app.quit();
                }
                _ => (),
            };

            glib::ControlFlow::Continue
        })
        .expect("Failed to add bus watch");

    let timeout_id = RefCell::new(Some(timeout_id));
    let bus_watch = RefCell::new(Some(bus_watch));
    app.connect_shutdown(move |_| {
        window.close();

        drop(bus_watch.borrow_mut().take());
        // if let Some(_pipeline) = _pipeline.borrow_mut().take() {
        // gstman.setStopStream();
        // }

        if let Some(timeout_id) = timeout_id.borrow_mut().take() {
            timeout_id.remove();
        }
    });
}

fn main() -> glib::ExitCode {
    std::env::set_var("RUST_BACKTRACE", "1");
    std::env::set_var("GST_DEBUG", "3");

    gst::init().unwrap();
    gtk::init().unwrap();

    gstgtk4::plugin_register_static().expect("Failed to register gstgtk4 plugin");

    let app = gtk::Application::new(None::<&str>, gio::ApplicationFlags::FLAGS_NONE);

    app.connect_activate(create_ui);
    let res = app.run();

    unsafe {
        gst::deinit();
    }

    res
}

Any help would be appreciated! Thanks

You’re adding the sink to a new bin every time a new filename is set. To make that work you need to ensure that the sink is not part of the previous bin anymore before adding it to the new one. Every element can only be in a single bin.

If you enable debug logs you should also see this as the cause of the problem.

Hey there thanks for the reply, really appreciate it, I am very new to using gstreamer with Rust so bear with me.

Is there a recommended approach to removing the gtksink? I was thinking of creating a Bin variable in the struct, removing gtksink then reinitialize the bin and pipeline every time the button is pressed. Is there a better approach you’d recommend?

That sounds like a valid option.