Rtph265pay add_probe() not called when buffers are available

I am trying to add some RTP header extension values to the outgoing RTP packets on a udp port. So I added a add_probe() method to the rtph265pay element. However I dont see the probes being called at all.

Here is a simplified version of my pipeline

"rtspsrc location=<myrtspurl> name=rtspsrc latency=100 \
      rtspsrc. ! rtph265depay ! tee name=t \
         t. ! queue ! h265parse ! rtph265pay name=rtph265pay config-interval=-1 \
 ! udpsink name=udpsink host=127.0.0.1 port=56001";

I verified that I am able to play the packets on a receiving pipeline. So the data is getting passed through. But I dont understand why I am not getting callbacks on add_probe().

I tried the exact same code on a rtsp stream sending H264 with rtph264pay. So it seems to be a behavior only with H265.

Another observation is that when I add a add_probe() to the udpsink sink element which is after rtph265pay. I dont get a callback there as well.

I have implemented this in rust and here is the way I have added the callback.

use gst::prelude::*;
use gst_rtp::RTPBuffer;
use anyhow::Error;
use gst::ClockTime;


// Inside main
fn main() {
    gst::init().unwrap();

    let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.249:554/ name=rtspsrc latency=100 \
     rtspsrc. ! rtph265depay ! tee name=t \
        t. ! queue ! h265parse name=h265parse ! rtph265pay name=rtph265pay config-interval=0 ! udpsink name=udpsink host=127.0.0.1 port=56001";

// This pipeline with h264 with the same code below will get add_probe callbacks.
    // let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.251:554/ name=rtspsrc latency=100 \
    //  rtspsrc. ! rtph264depay ! tee name=t \
    //     t. ! queue ! h264parse ! rtph264pay name=rtph265pay config-interval=-1 ! udpsink name=udpsink host=127.0.0.1 port=56001";


    let pipe_elem = gst::parse_launch(pipeline_str).unwrap();

    let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();

    let rtph265pay = pipeline.by_name("rtph265pay").unwrap();

    let rtppay_src_pad = rtph265pay.static_pad("src").unwrap();


    let udpsink = pipeline.by_name("udpsink").unwrap();

    let udpsink_sink_pad = udpsink.static_pad("sink").unwrap();



    let rtspsrc = pipeline.by_name("rtspsrc").unwrap();

    let bin_ref = pipeline.clone();

    let h265parse = pipeline.by_name("h265parse").unwrap();
    let h265parse_src_pad = h265parse.static_pad("src").unwrap();
    h265parse_src_pad.add_probe(gst::PadProbeType::BUFFER, |_, _| {
        println!("Buffer flowing through h265parse");
        gst::PadProbeReturn::Ok
    });


    rtspsrc.connect_pad_added(move |_, src_pad| {
        match src_pad.current_caps() {
            Some(caps) => {
                let new_pad_struct = caps.structure(0).expect("Failed to get first structure of caps for audio");
                for i in 0..new_pad_struct.n_fields() {
                    match new_pad_struct.nth_field_name(i).unwrap().as_str() {
                        "media" => {
                            let media_type = new_pad_struct.value("media").unwrap();
                            let field_value = media_type.get::<&str>().unwrap();
                            println!("field_value={}", field_value);
                            if field_value == "video" {
                                bin_ref.debug_to_dot_file(gst::DebugGraphDetails::all(), "PLAYING");
                                rtppay_src_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
                                    println!("probe for rtppay");
                                    if let Some(probe_data) = probe_info.data.as_mut() {
                                        if let gst::PadProbeData::Buffer(ref mut buffer) = probe_data {
                                            let size = buffer.size();
                                            match buffer.pts() {
                                                Some(pts) => {
                                                    println!("ptstime={}", pts.seconds())
                                                },
                                                None => {
                                                    println!("No PTS, cannot get bandwidth")
                                                }
                                            }

                                            let b = buffer.get_mut().unwrap();
                                            let mut rtp_buffer = RTPBuffer::from_buffer_writable(b).unwrap();

                                            let pts = rtp_buffer.buffer().pts().unwrap();
                                            // Convert the PTS to bytes
                                            let pts_bytes = pts.to_be_bytes();
                                            let extension_data = &pts_bytes[..];

                                            let appbits = 5; // Custom application-specific bits
                                            let id = 1; // Custom extension ID
                                            let result = rtp_buffer.add_extension_twobytes_header(appbits, id, extension_data);

                                            if let Err(e) = result {
                                                eprintln!("Failed to add RTP header extension: {:?}", e);
                                            }
                                        }
                                    }
                                    gst::PadProbeReturn::Ok
                                });

                                udpsink_sink_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
                                    println!("pad probe to udpsink");
                                    if let Some(probe_data) = probe_info.data.as_ref() {
                                        if let gst::PadProbeData::Buffer(buffer) = probe_data {
                                            let rtp_buffer = RTPBuffer::from_buffer_readable(buffer).unwrap();
                                            // Check for RTP extension header
                                            if let Some((appbits, extension_data)) = rtp_buffer.extension_twobytes_header(1, 0) { //extension_twobytes_header(1, 0) {
                                                println!("RTP Extension present:");
                                                println!("App bits: {}", appbits);
                                                println!("Extension data: {:?}", extension_data);

                                                // Convert the extension data back to PTS
                                                if extension_data.len() != 0 {
                                                    let mut pts_bytes = [0u8; 8];
                                                    pts_bytes[..4].copy_from_slice(&extension_data[..4]);  // Copy the first 4 bytes
                                                    let pts = u64::from_be_bytes(pts_bytes);
                                                    //let pts = u64::from_be_bytes(extension_data.try_into().unwrap());
                                                    println!("Extracted PTS from RTP extension: {}", pts);
                                                }
                                            } else {
                                                println!("No RTP Extension found");
                                            }
                                            match rtp_buffer.buffer().pts() {
                                                Some(pts) => {
                                                    println!("udpsink buffer.pts={}", pts.seconds());
                                                },
                                                None => {
                                                    println!("No PTS, cannot get bandwidth");
                                                }
                                            }
                                        }
                                    }
                                    gst::PadProbeReturn::Ok
                                }).unwrap();

                            }
                        }
                        _ => {}
                    }
                }
            }
            _ => {}
        }
    });


    let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();

    // Start the pipeline and run the main loop
    let bus = pipeline.bus().unwrap();
    let pipeline_weak = pipeline.downgrade();
    let _watch = bus.add_watch_local(move |_, msg| {
        if let Some(pipeline) = pipeline_weak.upgrade() {
            match msg.view() {
                gst::MessageView::Element(msg) => {
                    println!("msg.name()={}", msg.src().unwrap().name());
                },
                _ => (),
            }
        }
        glib::ControlFlow::Continue
    }).unwrap();


    pipeline.set_state(gst::State::Playing).unwrap();

    let main_loop = glib::MainLoop::new(None, false);
    main_loop.run();
}

Please let me know what I have missed. I appreciate your help!

This is a bug and I have submitted a issue for the same.

Copying comment from the issue:

rtph265pay and various other RTP payloaders output buffer lists, not just single buffers. You need to handle those separately from buffers in your pad probe and use the corresponding condition.

Thanks a lot for your response @slomo . I was seeing the callbacks come for rtph264pay so I thought it might be a bug. As a workaround i was adding another identity element after it. I didn’t try buffer list.

rtph264pay can also output buffer lists