How to close gracefully a Sink based pipeline?

Hi, I have implemented an abstraction called StreamCapture in the kornia-rs crate which takes a gstreamer pipeline string with assuming a final sink element with the purpose to do some camera image processing.

I have followed the sink example that you provide in Gitlab and modified a bit as follows:

pub struct StreamCapture {
    pipeline: gst::Pipeline,
    // TODO: pass Image<u8, 3> as a generic type
    receiver: tokio::sync::mpsc::Receiver<Image<u8, 3>>,
    handle: Option<tokio::task::JoinHandle<()>>,
}

impl StreamCapture {
    /// Creates a new StreamCapture object
    ///
    /// NOTE: The pipeline description should contain an appsink element with the name "sink".
    ///
    /// # Arguments
    ///
    /// * `pipeline_desc` - The GStreamer pipeline description.
    ///
    /// # Returns
    ///
    /// A StreamCapture object
    pub fn new(pipeline_desc: &str) -> Result<Self, StreamCaptureError> {
        // initialize GStreamer
        gst::init()?;

        let pipeline = gst::parse::launch(pipeline_desc)?
            .dynamic_cast::<gst::Pipeline>()
            .map_err(StreamCaptureError::DowncastPipelineError)?;

        let appsink = pipelineI'm looking for some advice wether the abstraction
            .by_name("sink")
            .ok_or_else(|| StreamCaptureError::DowncastAppSinkError)?
            .dynamic_cast::<gst_app::AppSink>()
            .map_err(StreamCaptureError::DowncastPipelineError)?;

        // the sender and receiver for the image frames
        let (tx, rx) = tokio::sync::mpsc::channel(10);

        appsink.set_callbacks(
            gst_app::AppSinkCallbacks::builder()
                .new_sample(move |sink| match Self::extract_image_frame(sink) {
                    Ok(frame) => {
                        if tx.blocking_send(frame).is_err() {
                            Err(gst::FlowError::Error)
                        } else {
                            Ok(gst::FlowSuccess::Ok)
                        }
                    }
                    Err(_) => Err(gst::FlowError::Error),
                })
                .build(),
        );

        Ok(Self {
            pipeline,
            receiver: rx,
            handle: None,
        })
    }

    /// Runs the capture object and grabs frames from the source
    ///
    /// # Arguments
    ///
    /// * `f` - A function that takes an image frame
    pub async fn run<F>(&mut self, mut f: F) -> Result<(), StreamCaptureError>
    where
        F: FnMut(Image<u8, 3>) -> Result<(), Box<dyn std::error::Error>>,
    {
        // start the pipeline
        let pipeline = &self.pipeline;
        pipeline.set_state(gst::State::Playing)?;

        let bus = pipeline.bus().ok_or_else(|| StreamCaptureError::BusError)?;

        //// start a thread to handle the messages from the bus
        let mut messages = bus.stream();

        let handle = tokio::spawn(async move {
            while let Some(msg) = messages.next().await {
                use gst::MessageView;
                match msg.view() {
                    MessageView::Eos(..) => {
                        println!("EOS");
                        break;
                    }
                    MessageView::Error(err) => {
                        eprintln!(
                            "Error from {:?}: {} ({:?})",
                            msg.src().map(|s| s.path_string()),
                            err.error(),
                            err.debug()
                        );
                        break;
                    }
                    _ => (),
                }
            }
        });

        self.handle = Some(handle);

        //// start grabbing frames from the source
        while let Some(img) = self.receiver.recv().await {
            f(img)?;
        }

        Ok(())
    }

    /// Closes the capture object
    pub fn close(&mut self) -> Result<(), StreamCaptureError> {
        let res = self.pipeline.send_event(gst::event::Eos::new());
        if !res {
            return Err(StreamCaptureError::SendEosError);
        }
        if let Some(handle) = self.handle.take() {
            handle.abort()
        }
        self.pipeline.set_state(gst::State::Null)?;
        Ok(())
    }

However, I’m facing always issues when the pipeline crashes (e.g the camera for some reason goes down, or there’s a bad config like in the rtsp user/pwd, or when a plugin is not well installed in the system.

I feel I’m missing some logic in the MessageView::Error to kill the whole thing when something bad happens.

Besides, I’m looking for some advise wether the pattern I implemented goes into the right direction, and what improvements would you suggest. For now this is functional and I’m getting really good results, so thanks for the amazing work you do for the Rust community.

[Side quest question]: Are you guys even considering to port the whole thing into Rust ? :slight_smile: