Hi,
I have a pipeline where I want to keep feeding data from a &[u8] buffer and each time I want to change the file location where I want to save this data.
A simple example of my pipeline is appsrc ! tsdemux ! h264parse ! mpegtsmux ! filesink
I want to keep changing the location property of filesink so that it points to a different file.
However the filesink would not let me change on a running pipeline as it says it is already open.
I changed my strategy to use fdsink and create a new fdsink everytime I get appsrc.need_data() is called. But I cant get that to work cleanly either. When I probe the saved file, it throws SPS/PPS errors and “no frame!” errors.
Here is a simple implementation of my appsrc.need_data code. Can you please let me know what is a good way to save the file? I tried appsink.new_sample(), and coy the buffer into a file. But that saves a new file for every buffer and not the entire file.
Here is a simple implementation of my appsrc.need_data in rust. This works but I would like to know what is the best approach for this. I appreciate your help!
use gst::prelude::*;
use anyhow::Error;
use derive_more::{Display, Error};
use gst::BufferFlags;
use std::io::{BufReader, Read};
use std::fs::File;
use std::{thread, time};
use std::os::fd::{AsRawFd, RawFd};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
#[cfg(not(target_os = "macos"))]
pub fn run<T, F: FnOnce() -> T + Send + 'static>(main: F) -> T
    where
        T: Send + 'static,
{
    main()
}
lazy_static! {
    static ref SHARED_FILE: Arc<Mutex<Option<File>>> = Arc::new(Mutex::new(None));
}
#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct ErrorMessage {
    src: String,
    error: String,
    debug: Option<String>,
    source: glib::Error,
}
fn main_loop(pipeline: gst::Pipeline) -> Result<(), Error> {
    pipeline.set_state(gst::State::Playing);
    let bus = pipeline
        .bus()
        .expect("could not find bus");
    for msg in bus.iter_timed(gst::ClockTime::NONE) {
        use gst::MessageView;
        match msg.view() {
            MessageView::Eos(..) => {
                println!("got eos!!");
                break
            }
            MessageView::Error(err) => {
                pipeline.set_state(gst::State::Null)?;
                return Err(ErrorMessage {
                    src: msg
                        .src()
                        .map(|s| String::from(s.path_string()))
                        .unwrap_or_else(|| String::from("None")),
                    error: err.error().to_string(),
                    debug: Some("".to_string()),
                    source: err.error(),
                }
                    .into());
            }
            _ => (),
        }
    }
    pipeline.set_state(gst::State::Null)?;
    Ok(())
}
fn create_pipeline() -> Result<gst::Pipeline, Error> {
    gst::init()?;
    let pipeline = gst::Pipeline::default();
    let video_info = &gst::Caps::builder("video/mpegts")
        .field("systemstream", true)
        .build();
    let appsrc = gst_app::AppSrc::builder()
        .caps(video_info)
        .format(gst::Format::Time)
        .is_live(false)
        .build();
    let tsdemux = gst::ElementFactory::make("tsdemux").build()?;
    let h264parse = gst::ElementFactory::make("h264parse").build()?;
    let mpegtsmux = gst::ElementFactory::make("mpegtsmux").build()?;
    let filesink = gst::ElementFactory::make("fdsink").build()?;
    filesink.set_property("name", "filesink");
    // moved out of lambda to avoid move closures
    let video_sink_pad = h264parse.static_pad("sink").expect("could not get sink pad from h264parse");
    pipeline.add_many(&[appsrc.upcast_ref(), &tsdemux, h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
    gst::Element::link_many(&[appsrc.upcast_ref(), &tsdemux])?;
    tsdemux.connect_pad_added(move |_src, src_pad| {
        let is_video = if src_pad.name().starts_with("video") {
            true
        } else {
            false
        };
        let connect_demux = || -> Result<(), Error> {
            src_pad.link(&video_sink_pad).expect("failed to link tsdemux.video->h264parse.sink");
            println!("linked tsdemux->h264parse");
            Ok(())
        };
        if is_video {
            match connect_demux() {
                Ok(_) => println!("tsdemux->h264 connected"),
                Err(e) => println!("could not connect tsdemux->h264parse e:{}", e),
            }
        }
    });
    gst::Element::link_many(&[&h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
    let pipeline_weak = pipeline.downgrade();
    let mut i = 1;
    appsrc.set_callbacks(
        gst_app::AppSrcCallbacks::builder()
            .need_data(move |appsrc, _| {
                println!("getting the file");
                i +=1;
                if let Some(pipeline) = pipeline_weak.upgrade() {
                    let mut file_lock = SHARED_FILE.lock().unwrap();
                    *file_lock = Some(File::create(format!("{}.ts", i)).unwrap());
                    if let Some(ref file) = *file_lock {
                        // Obtain the raw file descriptor
                        let raw_fd: RawFd = file.as_raw_fd();
                        println!("Raw file descriptor: {}", raw_fd);
                        let elem = pipeline.by_name("filesink").unwrap();
                        elem.set_property("fd", raw_fd);
                    } else {
                        // Handle the case where the file is not yet created or is None
                        println!("No file available.");
                    }
                }
               // In the actual application I would be opening different ts files
                let mut file = File::open("./sample/test_1620.ts").unwrap();
                let mut buffer = Vec::new();
                match file.read_to_end(&mut buffer) {
                    Ok(_) => {
                        println!("finished reading bytes from file len={}", buffer.len());
                    }
                    Err(e) => {
                        println!("error reading file: {}", e);
                    }
                };
                let gst_buffer = gst::Buffer::from_slice(buffer);
                println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
                let _ = appsrc.push_buffer(gst_buffer);
                println!("sleeping for 10 secs");
                thread::sleep(time::Duration::from_secs(1));
            }).build(),
    );
    Ok(pipeline)
}
fn appsrc_main() {
    match create_pipeline().and_then(main_loop) {
        Ok(r) => r,
        Err(e) => println!("Error! {}", e),
    }
}
fn main() {
    run(appsrc_main);
}