Manually media stream

Hey there! I’m new to gstreamer.

I’m interested in media stream. Imagine such situation.
There’s some media stream, let’s say, audio streams,
the only thing you know about it is its format and chunk size, which means that the stream is transmitted through chunks. And here I want receive the stream, decode, and then play it. Is there anyway to do that?

You can feed it into a pipeline using the appsrc element (set caps according to the format and then create buffers and push them into the pipeline with appsrc).

Yes, I did, but it doesn’t work.

I tried it in both C and Rust, but both of them give the error: Internal stream error, which is threw by appsrc element.

To reproduce, you can read a file by chunk, and feed these chunk one by one to to appsrc, and try to decode them.

OH, Is something wrong with my code?

  let mut file = fs::File::open("./output").await?;
  // const CHUNK_SIZE: u64 = 1024;

  gst::init()?;

  let pipeline = gst::Pipeline::new();

  let audio_info =
    gst_audio::AudioInfo::builder(gst_audio::AudioFormat::Encoded, 48000, 1).build()?;

  let appsrc = gst_app::AppSrc::builder()
    .caps(&audio_info.to_caps()?)
    .build();

  // let audioconvert = gst::ElementFactory::make("audioconvert").build()?;
  // let audioresample = gst::ElementFactory::make("audioresample").build()?;
  // let audiosink = gst::ElementFactory::make("autoaudiosink").build()?;
  //
  // let elements = &[appsrc.as_ref(), &audioconvert, &audioresample, &audiosink];
  let queue = gst::ElementFactory::make("queue").build()?;
  let appsink = gst::ElementFactory::make("autoaudiosink").build()?;
  let elements = &[appsrc.as_ref(), &queue, &appsink];

  pipeline.add_many(elements)?;
  gst::Element::link_many(elements)?;

  let mut already_pushed = false;
  appsrc.set_callbacks(
    gst_app::AppSrcCallbacks::builder()
      .need_data(move |appsrc, _| {
        use tokio::io::AsyncReadExt;

        if already_pushed {
          appsrc.end_of_stream().unwrap();
          return;
        }

        println!("==> Pushing data...");

        // read CHUNK_SIZE bytes from file
        // WARN: appsrc callbacks are called before entering tokio context, must create a brand new
        // context

        // let mut buf = [0; CHUNK_SIZE as usize];
        let mut buf = vec![];
        // here we created a brand new tokio runtime
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(file.read_to_end(&mut buf)).unwrap();
        // println!("Read buffer : {:?}", buf);

        // push them to `appsrc`
        let buffer = gst::Buffer::new();

        unsafe {
          gst::ffi::gst_buffer_fill(
            buffer.as_mut_ptr(),
            0,
            buf.as_ptr() as *const std::ffi::c_void,
            buf.len(),
          )
        };

        appsrc.push_buffer(buffer).unwrap();
        already_pushed = true;
      })
      .build(),
  );

  pipeline.set_state(gst::State::Playing)?;

  let bus = pipeline
    .bus()
    .ok_or_else(|| anyhow::anyhow!("Failed to get bus from pipeline"))?;
  for msg in bus.iter_timed(gst::ClockTime::NONE) {
    use gst::MessageView::*;

    match msg.view() {
      Eos(_) => {
        println!("Reached end of the stream");

        break;
      }
      Error(err) => {
        println!("{}", err);

        pipeline.set_state(gst::State::Null)?;

        return Err(anyhow!("{}", err));
      }
      _ => (),
    }
  }

  pipeline.set_state(gst::State::Null)?;

  Ok(())
    gst_audio::AudioInfo::builder(gst_audio::AudioFormat::Encoded, 48000, 1).build()?;

That (encoded) isn’t really a format you can push into the pipeline.

Encoded audio has its own caps which are format specific, e.g. audio/mpeg, mpegversion=1, layer=1, ... for MP3, audio/x-opus, ... for Opus etc.