Hi,
I have a pipeline where I want to keep feeding data from a &[u8]
buffer and each time I want to change the file location where I want to save this data.
A simple example of my pipeline is appsrc ! tsdemux ! h264parse ! mpegtsmux ! filesink
I want to keep changing the location property of filesink so that it points to a different file.
However the filesink would not let me change on a running pipeline as it says it is already open.
I changed my strategy to use fdsink and create a new fdsink everytime I get appsrc.need_data()
is called. But I cant get that to work cleanly either. When I probe the saved file, it throws SPS/PPS errors and “no frame!” errors.
Here is a simple implementation of my appsrc.need_data code. Can you please let me know what is a good way to save the file? I tried appsink.new_sample()
, and coy the buffer into a file. But that saves a new file for every buffer and not the entire file.
Here is a simple implementation of my appsrc.need_data in rust. This works but I would like to know what is the best approach for this. I appreciate your help!
use gst::prelude::*;
use anyhow::Error;
use derive_more::{Display, Error};
use gst::BufferFlags;
use std::io::{BufReader, Read};
use std::fs::File;
use std::{thread, time};
use std::os::fd::{AsRawFd, RawFd};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
#[cfg(not(target_os = "macos"))]
pub fn run<T, F: FnOnce() -> T + Send + 'static>(main: F) -> T
where
T: Send + 'static,
{
main()
}
lazy_static! {
static ref SHARED_FILE: Arc<Mutex<Option<File>>> = Arc::new(Mutex::new(None));
}
#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct ErrorMessage {
src: String,
error: String,
debug: Option<String>,
source: glib::Error,
}
fn main_loop(pipeline: gst::Pipeline) -> Result<(), Error> {
pipeline.set_state(gst::State::Playing);
let bus = pipeline
.bus()
.expect("could not find bus");
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("got eos!!");
break
}
MessageView::Error(err) => {
pipeline.set_state(gst::State::Null)?;
return Err(ErrorMessage {
src: msg
.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.error().to_string(),
debug: Some("".to_string()),
source: err.error(),
}
.into());
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())
}
fn create_pipeline() -> Result<gst::Pipeline, Error> {
gst::init()?;
let pipeline = gst::Pipeline::default();
let video_info = &gst::Caps::builder("video/mpegts")
.field("systemstream", true)
.build();
let appsrc = gst_app::AppSrc::builder()
.caps(video_info)
.format(gst::Format::Time)
.is_live(false)
.build();
let tsdemux = gst::ElementFactory::make("tsdemux").build()?;
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let mpegtsmux = gst::ElementFactory::make("mpegtsmux").build()?;
let filesink = gst::ElementFactory::make("fdsink").build()?;
filesink.set_property("name", "filesink");
// moved out of lambda to avoid move closures
let video_sink_pad = h264parse.static_pad("sink").expect("could not get sink pad from h264parse");
pipeline.add_many(&[appsrc.upcast_ref(), &tsdemux, h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &tsdemux])?;
tsdemux.connect_pad_added(move |_src, src_pad| {
let is_video = if src_pad.name().starts_with("video") {
true
} else {
false
};
let connect_demux = || -> Result<(), Error> {
src_pad.link(&video_sink_pad).expect("failed to link tsdemux.video->h264parse.sink");
println!("linked tsdemux->h264parse");
Ok(())
};
if is_video {
match connect_demux() {
Ok(_) => println!("tsdemux->h264 connected"),
Err(e) => println!("could not connect tsdemux->h264parse e:{}", e),
}
}
});
gst::Element::link_many(&[&h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
let pipeline_weak = pipeline.downgrade();
let mut i = 1;
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
println!("getting the file");
i +=1;
if let Some(pipeline) = pipeline_weak.upgrade() {
let mut file_lock = SHARED_FILE.lock().unwrap();
*file_lock = Some(File::create(format!("{}.ts", i)).unwrap());
if let Some(ref file) = *file_lock {
// Obtain the raw file descriptor
let raw_fd: RawFd = file.as_raw_fd();
println!("Raw file descriptor: {}", raw_fd);
let elem = pipeline.by_name("filesink").unwrap();
elem.set_property("fd", raw_fd);
} else {
// Handle the case where the file is not yet created or is None
println!("No file available.");
}
}
// In the actual application I would be opening different ts files
let mut file = File::open("./sample/test_1620.ts").unwrap();
let mut buffer = Vec::new();
match file.read_to_end(&mut buffer) {
Ok(_) => {
println!("finished reading bytes from file len={}", buffer.len());
}
Err(e) => {
println!("error reading file: {}", e);
}
};
let gst_buffer = gst::Buffer::from_slice(buffer);
println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
let _ = appsrc.push_buffer(gst_buffer);
println!("sleeping for 10 secs");
thread::sleep(time::Duration::from_secs(1));
}).build(),
);
Ok(pipeline)
}
fn appsrc_main() {
match create_pipeline().and_then(main_loop) {
Ok(r) => r,
Err(e) => println!("Error! {}", e),
}
}
fn main() {
run(appsrc_main);
}