Appsink showing only the first frame

I’m only trying to receive video so I can’t test with videotestsrc. But you are right! The way in which I’m doing timestamps is incorrect. With do-timestamp=true, I’m seeing some progress…

I’m getting live data (frames are sliding), but it’s coming like this and the signal need-data keeps getting triggered. Does this mean I’m pushing data slowly?

let on_track = |track_event: libwebrtc::peer_connection::TrackEvent| {
    let mut pipeline_str = String::from(
        "appsrc name=src is-live=true format=time do-timestamp=true ! queue !",
    );
    pipeline_str.push_str(
        "rawvideoparse name=rawvideoparse format=i420 framerate=30/1 ! ",
    );
    pipeline_str.push_str("videoconvert ! autovideosink");
    println!("The pipeline is:\n{}", pipeline_str);
    let pipeline = gstreamer::parse::launch(&pipeline_str).unwrap();
    let pipeline = pipeline.dynamic_cast::<gstreamer::Pipeline>().unwrap();
    let pipeline_clone = pipeline.clone();
    let appsrc = pipeline.by_name("src").unwrap();
    let appsrc = appsrc.downcast::<gstreamer_app::AppSrc>().unwrap();
    appsrc.connect("need-data", false, |args| {
        println!("Needs data");
        None
    });
    if let libwebrtc::prelude::MediaStreamTrack::Video(track) =
        track_event.track
    {
        println!("Its a video track");
        thread::spawn(move || {
            let mut stream = NativeVideoStream::new(track);
            println!("Came here");
            let mut done = false;
            while let Some(frame) = block_on(stream.next()) {
                let i420_buffer = frame.buffer.to_i420();
                //
                let width = i420_buffer.width() as u32;
                let height = i420_buffer.height() as 
                if !done {
                    let rawvideoparse =
                        pipeline.by_name("rawvideoparse").unwrap();
                    rawvideoparse.set_property("width", width as i32);
                    rawvideoparse.set_property("height", height as i32);
                    pipeline.set_state(gstreamer::State::Playing).unwrap();
                    println!("Set to playing");
                }
                done = t
                let data_yuv = i420_buffer.data();
                let strides_yuv = i420_buffer.strides()
                let chroma_width = i420_buffer.chroma_width();
                let chroma_height = i420_buffer.chroma_height();
                let mut raw_data =
                    Vec::with_capacity((width * height * 3 / 2) as usize
                for row in 0..height {
                    let start = (row * strides_yuv.0) as usize;
                    let end = start + width as usize;
                    raw_data.extend_from_slice(&data_yuv.0[start..end]);
                
                // Copy U plane
                for row in 0..chroma_height {
                    let start = (row * strides_yuv.1) as usize;
                    let end = start + chroma_width as usize;
                    raw_data.extend_from_slice(&data_yuv.1[start..end]);
                
                // Copy V plane
                for row in 0..chroma_height {
                    let start = (row * strides_yuv.2) as usize;
                    let end = start + chroma_width as usize;
                    raw_data.extend_from_slice(&data_yuv.2[start..end]);
                }
                let gst_buffer = gstreamer::Buffer::from_mut_slice(raw_data);
                //
                if let Err(e) = appsrc.push_buffer(gst_buffer) {
                    println!("Error {:?}", e);
                }
                println!("Pushed");
            }
        });
        thread::spawn(move || {
            let bus = pipeline_clone.bus().unwrap();
            for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
                match msg.view() {
                    MessageView::Eos(_) => {
                        println!("End of stream");
                    }
                    MessageView::Error(e) => {
                        println!("stream error {}", e);
                    }
                    _ => (),
                }
            }
        });
    }
}

I followed your advice and used gst::Buffer::from_mut_slice(). I’m not sure if it’s possible to directly pass the input data as I’m not really knowledgeable about this.

I was using webrtcbin before this. However, due to it not supporting stopping transceivers in SDP negotiation and artifacting with webrtcsrc, I have to go down this route which I wish didn’t have to :frowning: