Use different fdsink in a running pipeline in rust

Hi,
I have a pipeline where I want to keep feeding data from a &[u8] buffer and each time I want to change the file location where I want to save this data.

A simple example of my pipeline is appsrc ! tsdemux ! h264parse ! mpegtsmux ! filesink

I want to keep changing the location property of filesink so that it points to a different file.
However the filesink would not let me change on a running pipeline as it says it is already open.

I changed my strategy to use fdsink and create a new fdsink everytime I get appsrc.need_data() is called. But I cant get that to work cleanly either. When I probe the saved file, it throws SPS/PPS errors and “no frame!” errors.

Here is a simple implementation of my appsrc.need_data code. Can you please let me know what is a good way to save the file? I tried appsink.new_sample(), and coy the buffer into a file. But that saves a new file for every buffer and not the entire file.

Here is a simple implementation of my appsrc.need_data in rust. This works but I would like to know what is the best approach for this. I appreciate your help!

use gst::prelude::*;
use anyhow::Error;
use derive_more::{Display, Error};
use gst::BufferFlags;
use std::io::{BufReader, Read};
use std::fs::File;
use std::{thread, time};
use std::os::fd::{AsRawFd, RawFd};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;

#[cfg(not(target_os = "macos"))]
pub fn run<T, F: FnOnce() -> T + Send + 'static>(main: F) -> T
    where
        T: Send + 'static,
{
    main()
}


lazy_static! {
    static ref SHARED_FILE: Arc<Mutex<Option<File>>> = Arc::new(Mutex::new(None));
}

#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct ErrorMessage {
    src: String,
    error: String,
    debug: Option<String>,
    source: glib::Error,
}

fn main_loop(pipeline: gst::Pipeline) -> Result<(), Error> {
    pipeline.set_state(gst::State::Playing);

    let bus = pipeline
        .bus()
        .expect("could not find bus");

    for msg in bus.iter_timed(gst::ClockTime::NONE) {
        use gst::MessageView;

        match msg.view() {
            MessageView::Eos(..) => {
                println!("got eos!!");
                break
            }
            MessageView::Error(err) => {
                pipeline.set_state(gst::State::Null)?;
                return Err(ErrorMessage {
                    src: msg
                        .src()
                        .map(|s| String::from(s.path_string()))
                        .unwrap_or_else(|| String::from("None")),
                    error: err.error().to_string(),
                    debug: Some("".to_string()),
                    source: err.error(),
                }
                    .into());
            }
            _ => (),
        }
    }
    pipeline.set_state(gst::State::Null)?;
    Ok(())
}


fn create_pipeline() -> Result<gst::Pipeline, Error> {
    gst::init()?;

    let pipeline = gst::Pipeline::default();

    let video_info = &gst::Caps::builder("video/mpegts")
        .field("systemstream", true)
        .build();

    let appsrc = gst_app::AppSrc::builder()
        .caps(video_info)
        .format(gst::Format::Time)
        .is_live(false)
        .build();

    let tsdemux = gst::ElementFactory::make("tsdemux").build()?;
    let h264parse = gst::ElementFactory::make("h264parse").build()?;
    let mpegtsmux = gst::ElementFactory::make("mpegtsmux").build()?;
    let filesink = gst::ElementFactory::make("fdsink").build()?;
    filesink.set_property("name", "filesink");

    // moved out of lambda to avoid move closures
    let video_sink_pad = h264parse.static_pad("sink").expect("could not get sink pad from h264parse");

    pipeline.add_many(&[appsrc.upcast_ref(), &tsdemux, h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
    gst::Element::link_many(&[appsrc.upcast_ref(), &tsdemux])?;

    tsdemux.connect_pad_added(move |_src, src_pad| {
        let is_video = if src_pad.name().starts_with("video") {
            true
        } else {
            false
        };


        let connect_demux = || -> Result<(), Error> {
            src_pad.link(&video_sink_pad).expect("failed to link tsdemux.video->h264parse.sink");
            println!("linked tsdemux->h264parse");
            Ok(())
        };

        if is_video {
            match connect_demux() {
                Ok(_) => println!("tsdemux->h264 connected"),
                Err(e) => println!("could not connect tsdemux->h264parse e:{}", e),
            }
        }
    });


    gst::Element::link_many(&[&h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
    let pipeline_weak = pipeline.downgrade();

    let mut i = 1;
    appsrc.set_callbacks(
        gst_app::AppSrcCallbacks::builder()
            .need_data(move |appsrc, _| {
                println!("getting the file");
                i +=1;

                if let Some(pipeline) = pipeline_weak.upgrade() {
                    let mut file_lock = SHARED_FILE.lock().unwrap();
                    *file_lock = Some(File::create(format!("{}.ts", i)).unwrap());
                    if let Some(ref file) = *file_lock {
                        // Obtain the raw file descriptor
                        let raw_fd: RawFd = file.as_raw_fd();
                        println!("Raw file descriptor: {}", raw_fd);
                        let elem = pipeline.by_name("filesink").unwrap();
                        elem.set_property("fd", raw_fd);

                    } else {
                        // Handle the case where the file is not yet created or is None
                        println!("No file available.");
                    }
                }

               // In the actual application I would be opening different ts files
                let mut file = File::open("./sample/test_1620.ts").unwrap();
                let mut buffer = Vec::new();
                match file.read_to_end(&mut buffer) {
                    Ok(_) => {
                        println!("finished reading bytes from file len={}", buffer.len());
                    }
                    Err(e) => {
                        println!("error reading file: {}", e);
                    }
                };
                let gst_buffer = gst::Buffer::from_slice(buffer);
                println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
                let _ = appsrc.push_buffer(gst_buffer);
                println!("sleeping for 10 secs");
                thread::sleep(time::Duration::from_secs(1));
            }).build(),
    );

    Ok(pipeline)

}

fn appsrc_main() {
    match create_pipeline().and_then(main_loop) {
        Ok(r) => r,
        Err(e) => println!("Error! {}", e),
    }
}

fn main() {
    run(appsrc_main);
}

An easier approach would probably be to use an appsink instead and then handle the buffers yourself in whatever way you want.

There’s an appsink example that might be a good starting point for what you’re trying to do.

You probably also only want to change the file as part of the appsink callbacks, and not in the appsrc callbacks. You probably want to wait until any queued data is written out before switching, for example.

Thanks a lot @slomo ! Since appsink will get a new_sample for every buffer. Should I just generate a custom event in the pipeline once the next appsrc.need_data() is called. So that when that is received in the appsink’s pad, I can know that the queued data is all pushed and arrived at the appsink?
Would that be a good way to know in teh appsink that all the data is received on the appsink side?

Yes that would be one option.

Hi @slomo , Adding an event to the pipeline when appsrc is done doesnt seem to work correctly. I added a add_probe to the appsink and then based on that change the file descriptor.

I noticed that the events are not received on the pad sequentially but a couple of events are received together.

Is there an alternative approach to doing this?

I really appreciate your help on this.

Here is a simple pipeline that does it.

use gst::prelude::*;
use anyhow::Error;
use derive_more::{Display, Error};
use std::io::{Read};
use std::fs::File;
use std::{fs, thread, time};
use std::os::fd::{AsRawFd, RawFd};
use std::io::prelude::*;
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use lazy_static::lazy_static;

use tracing::{info, warn, error, Level, debug};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::prelude::*;

#[cfg(not(target_os = "macos"))]
pub fn run<T, F: FnOnce() -> T + Send + 'static>(main: F) -> T
    where
        T: Send + 'static,
{
    main()
}


lazy_static! {
    static ref SHARED_FILE: Arc<Mutex<Option<File>>> = Arc::new(Mutex::new(None));
}

#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct ErrorMessage {
    src: String,
    error: String,
    debug: Option<String>,
    source: glib::Error,
}

fn main_loop(pipeline: gst::Pipeline) -> Result<(), Error> {
    pipeline.set_state(gst::State::Playing);

    let bus = pipeline
        .bus()
        .expect("could not find bus");

    for msg in bus.iter_timed(gst::ClockTime::NONE) {
        use gst::MessageView;

        match msg.view() {
            MessageView::Eos(..) => {
                println!("got eos!!");
                break
            }
            MessageView::Error(err) => {
                pipeline.set_state(gst::State::Null)?;
                return Err(ErrorMessage {
                    src: msg
                        .src()
                        .map(|s| String::from(s.path_string()))
                        .unwrap_or_else(|| String::from("None")),
                    error: err.error().to_string(),
                    debug: Some(err.debug().unwrap().to_string()),
                    source: err.error(),
                }
                    .into());
            }
            _ => (),
        }
    }
    pipeline.set_state(gst::State::Null)?;
    Ok(())
}

const WIDTH: usize = 320;
const HEIGHT: usize = 240;

#[derive(Debug)]
pub struct CustomEOSEvent {
    pub send_eos: bool,
}

impl CustomEOSEvent {
    const EVENT_NAME: &'static str = "custom-eos-event";

    #[allow(clippy::new_ret_no_self)]
    pub fn new(send_eos: bool) -> gst::Event {
        let s = gst::Structure::builder(Self::EVENT_NAME)
            .field("send_eos", send_eos)
            .build();
        gst::event::CustomDownstream::new(s)
    }

    pub fn parse(ev: &gst::EventRef) -> Option<CustomEOSEvent> {
        match ev.view() {
            gst::EventView::CustomDownstream(e) => {
                let s = match e.structure() {
                    Some(s) if s.name() == Self::EVENT_NAME => s,
                    _ => return None,
                };

                let send_eos = s.get::<bool>("send_eos").unwrap();
                Some(CustomEOSEvent { send_eos })
            }
            _ => None,
        }
    }
}



fn create_pipeline() -> Result<gst::Pipeline, Error> {
    gst::init()?;

    let pipeline = gst::parse_launch(format!("appsrc name=appsrc ! video/mpegts,systemstream=true ! tsdemux name=demux \
    demux. ! h264parse ! queue ! avdec_h264  ! videoconvert ! avenc_h264_videotoolbox ! mpegtsmux name=mux ! appsink name=appsink demux. ! queue ! aacparse ! mux.").as_str())
        .unwrap()
        .downcast::<gst::Pipeline>()
        .unwrap();

    let pipeline_weak = pipeline.downgrade();

    let appsrc = pipeline.by_name("appsrc")
        .unwrap()
        .downcast::<gst_app::AppSrc>()
        .unwrap();

    //let mut outputfile = File::create("output.ts").expect("unable tot create file");

    let mut file_lock = SHARED_FILE.lock().unwrap();
    {
        let mut myfile = fs::OpenOptions::new()
            .write(true)
            .append(true)
            .create(true)
            .open("output.ts").unwrap();
        *file_lock = Some(myfile);
    }


    let mut i = 1;
    appsrc.set_callbacks(
        gst_app::AppSrcCallbacks::builder()
            .need_data(move |appsrc, _| {
                info!("getting the file");
                i+=1;


                let mut file = File::open("./sample/test_1620_h264.ts").unwrap();
                let mut buffer = Vec::new();
                match file.read_to_end(&mut buffer) {
                    Ok(_) => { info!(cam_id = "snx-c2ahme", "finished reading bytes from file len={}", buffer.len());
                    }
                    Err(e) => {
                        println!("error reading file: {}", e);
                    }
                };

                let gst_buffer = gst::Buffer::from_slice(buffer);
                println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
                let _ = appsrc.push_buffer(gst_buffer);

                println!("--------sleeping for 1 sec-----------");
                thread::sleep(Duration::from_secs(1));
                let mut buffer_new = gst::Buffer::new();
                {
                    let buffer = buffer_new.get_mut().unwrap();
                    buffer.set_size(0);  // Set size to 0 for an empty buffer
                }

                // Push the buffer into the pipeline
                appsrc.push_buffer(buffer_new).unwrap();

                if let Some(pipeline) = pipeline_weak.upgrade() {
                    let ev = CustomEOSEvent::new(true);
                    if !pipeline.send_event(ev) {
                        println!("warning! failed to send eos event")
                    } else {
                        println!("appsrc sending event on pipeline!")
                    }
                }
                //let ev = CustomEOSEvent::new(true);
                //`appsrc.send_event(ev);


                //appsrc.end_of_stream();

            }).build(),
    );

    let appsink = pipeline.by_name("appsink").unwrap()
        .downcast::<gst_app::AppSink>()
        .unwrap();
    appsink.connect_last_sample_notify(|x| {
        println!("received the last sample!");
    });


    let pipeline_weak = pipeline.downgrade();

    // first set a pad probe
    let sinkpad = appsink.static_pad("sink").unwrap();


    let event_counter = AtomicUsize::new(0);


    sinkpad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |x, probe_info| {
        match probe_info.data {
            Some(gst::PadProbeData::Event(ref ev))
            if ev.type_() == gst::EventType::CustomDownstream => {
                if let Some(eos_event) = CustomEOSEvent::parse(ev) {
                    if let Some(pipeline) = pipeline_weak.upgrade() {
                        println!("closing file!! received custom event on appsink i={}", event_counter.load(Ordering::SeqCst));
                        event_counter.fetch_add(1, Ordering::SeqCst);
                        // we have received the event.. change the file descriptor
                        let mut file_lock = SHARED_FILE.lock().unwrap();
                        {
                            let mut myfile = fs::OpenOptions::new()
                                .write(true)
                                .append(true)
                                .create(true)
                                .open(format!("{}.ts", event_counter.load(Ordering::SeqCst))).unwrap();
                            *file_lock = Some(myfile);
                        }
                    }
                }
            }
            _ => (),
        }
        gst::PadProbeReturn::Ok
    });


    appsink.set_callbacks(
        gst_app::AppSinkCallbacks::builder()
            .new_sample(move |appsink| {
                let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
                let buffer = sample.buffer().ok_or_else(|| {
                        println!("Failed to get buffer from appsink");
                    gst::FlowError::Error
                })?;

                let map = buffer.map_readable().map_err(|_| {
                        println!("Failed to map buffer readable");
                    gst::FlowError::Error
                })?;

                let mut file_lock = SHARED_FILE.lock().unwrap();

                if let Some(file) = file_lock.as_mut() {
                    file.write_all(map.as_slice()).expect("unable to write the file")
                }


                Ok(gst::FlowSuccess::Ok)
            })
            .build()
    );

    Ok(pipeline)

}

fn appsrc_main() {
    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer()
            // Enable JSON formatter
            .json()
            // Include file name, line number, target, etc.
            .with_span_list(false)
            .with_span_events(FmtSpan::CLOSE)
            .with_file(true)
            .with_line_number(true)
            .with_target(true)
        )
        .init();
    match create_pipeline().and_then(main_loop) {
        Ok(r) => r,
        Err(e) => println!("Error! {}", e),
    }
}

fn main() {
    run(appsrc_main);
}

I tried a few other things like trying to send an empty buffer or a buffer with a string that I can detect at the appsink end.
But I think since the decoder is involved it is discarding it?

                let mut buffer_new = gst::Buffer::new();
                {
                    let buffer = buffer_new.get_mut().unwrap();
                    buffer.set_size(0);  // Set size to 0 for an empty buffer
                }

We also cannot use the size in bytes to compare if everything is received because it might be different after encoding.

That should theoretically work but you’ll have to debug where the custom serialized downstream event gets de-serialized with the data flow.

What’s the bigger picture here btw? Why do you want to pass one buffer to the appsrc and then know when that input file was fully consumed inside the sink?

Hi @slomo ,
I basically want to keep transcoding a number of h264 ts segments into H265. I found that when I start a new pipeline especially with NVIDIA hardware, the decoder/encoder setup takes a lot of time to setup.
So I thought of running a continuous pipeline with “appsrc ! decoder ! encoder ! appsink”, where I can keep feeding a h264 ts segment and keep saving the file to disk.

Right, so working around inefficiencies in the codecs. In theory what you do should work then but there are likely bugs in elements in between where they don’t forward the event correctly. That will need debugging.

You’ll also have to make sure that each new file starts with a keyframe though, for which you’ll have to e.g. make use of the force-keyunit events to request one on the encoder at the start of each file.

Hi @slomo ,
Thanks a lot for your response so far! I have one more question regarding the same. One of the main issue I face is that we use the PTS values from video ingestion that is stored to detect when a particular object was found using some custom AI elements.
When I transcode the file using gstreamer I lose the original PTS values that was in the original file and a new PTS counter starting from 0 is in the newly transcoded file.

Is there a way to preserve or passthrough the original PTS value from teh file in the newly transcoded file? ffmpeg supports it by having a -copyts flag.

-copyts
Do not process input timestamps, but keep their values without trying to sanitize them. In particular, do not remove the initial start time offset value.

Is there any way we can do that in gstreamer?

Eg original ts segment had a PTS value starting from

frame,1,6208200000,68980.000000,I,0
frame,0,6208202999,68980.033322,P,0
frame,0,6208206000,68980.066667,P,0

After going through the gstreamer transcode they got rewritten to 0.

Thanks a lot!
Best Regards,
Guru

@slomo I dont know if this is the right way to do it… But do I need to get all the I frame PTS by doing a probe using ffprobe, then use a pad_probe method like this?

But I am not sure how to get buffer, update it and let the pipeline know if it is changed.

I appreciate your help in this!

let my_iframe_pts_from_ffmpeg_ns = 100 * gst::ClockTime::MSECOND;
decoder_pad.add_probe(PadProbeType::BUFFER, |src_pad, probe_info| {
        if let Some(gst::PadProbeData::Buffer(ref mut buffer)) = probe_info.data.take() {
        // somehow make the buffer writable
       // set the PTS on teh buffer
      // change the probe_info reference to point to the new buffer
      // somehow notify the pipeline that the buffer has changed
        }
        gst::PadProbeReturn::Ok
    });