I’m interested in media stream. Imagine such situation.
There’s some media stream, let’s say, audio streams,
the only thing you know about it is its format and chunk size, which means that the stream is transmitted through chunks. And here I want receive the stream, decode, and then play it. Is there anyway to do that?
You can feed it into a pipeline using the appsrc element (set caps according to the format and then create buffers and push them into the pipeline with appsrc).
let mut file = fs::File::open("./output").await?;
// const CHUNK_SIZE: u64 = 1024;
gst::init()?;
let pipeline = gst::Pipeline::new();
let audio_info =
gst_audio::AudioInfo::builder(gst_audio::AudioFormat::Encoded, 48000, 1).build()?;
let appsrc = gst_app::AppSrc::builder()
.caps(&audio_info.to_caps()?)
.build();
// let audioconvert = gst::ElementFactory::make("audioconvert").build()?;
// let audioresample = gst::ElementFactory::make("audioresample").build()?;
// let audiosink = gst::ElementFactory::make("autoaudiosink").build()?;
//
// let elements = &[appsrc.as_ref(), &audioconvert, &audioresample, &audiosink];
let queue = gst::ElementFactory::make("queue").build()?;
let appsink = gst::ElementFactory::make("autoaudiosink").build()?;
let elements = &[appsrc.as_ref(), &queue, &appsink];
pipeline.add_many(elements)?;
gst::Element::link_many(elements)?;
let mut already_pushed = false;
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
use tokio::io::AsyncReadExt;
if already_pushed {
appsrc.end_of_stream().unwrap();
return;
}
println!("==> Pushing data...");
// read CHUNK_SIZE bytes from file
// WARN: appsrc callbacks are called before entering tokio context, must create a brand new
// context
// let mut buf = [0; CHUNK_SIZE as usize];
let mut buf = vec![];
// here we created a brand new tokio runtime
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(file.read_to_end(&mut buf)).unwrap();
// println!("Read buffer : {:?}", buf);
// push them to `appsrc`
let buffer = gst::Buffer::new();
unsafe {
gst::ffi::gst_buffer_fill(
buffer.as_mut_ptr(),
0,
buf.as_ptr() as *const std::ffi::c_void,
buf.len(),
)
};
appsrc.push_buffer(buffer).unwrap();
already_pushed = true;
})
.build(),
);
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline
.bus()
.ok_or_else(|| anyhow::anyhow!("Failed to get bus from pipeline"))?;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView::*;
match msg.view() {
Eos(_) => {
println!("Reached end of the stream");
break;
}
Error(err) => {
println!("{}", err);
pipeline.set_state(gst::State::Null)?;
return Err(anyhow!("{}", err));
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())