Rtph265pay add_probe() not called when buffers are available

I am trying to add some RTP header extension values to the outgoing RTP packets on a udp port. So I added a add_probe() method to the rtph265pay element. However I dont see the probes being called at all.

Here is a simplified version of my pipeline

"rtspsrc location=<myrtspurl> name=rtspsrc latency=100 \
      rtspsrc. ! rtph265depay ! tee name=t \
         t. ! queue ! h265parse ! rtph265pay name=rtph265pay config-interval=-1 \
 ! udpsink name=udpsink host=127.0.0.1 port=56001";

I verified that I am able to play the packets on a receiving pipeline. So the data is getting passed through. But I dont understand why I am not getting callbacks on add_probe().

I tried the exact same code on a rtsp stream sending H264 with rtph264pay. So it seems to be a behavior only with H265.

Another observation is that when I add a add_probe() to the udpsink sink element which is after rtph265pay. I dont get a callback there as well.

I have implemented this in rust and here is the way I have added the callback.

use gst::prelude::*;
use gst_rtp::RTPBuffer;
use anyhow::Error;
use gst::ClockTime;


// Inside main
fn main() {
    gst::init().unwrap();

    let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.249:554/ name=rtspsrc latency=100 \
     rtspsrc. ! rtph265depay ! tee name=t \
        t. ! queue ! h265parse name=h265parse ! rtph265pay name=rtph265pay config-interval=0 ! udpsink name=udpsink host=127.0.0.1 port=56001";

// This pipeline with h264 with the same code below will get add_probe callbacks.
    // let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.251:554/ name=rtspsrc latency=100 \
    //  rtspsrc. ! rtph264depay ! tee name=t \
    //     t. ! queue ! h264parse ! rtph264pay name=rtph265pay config-interval=-1 ! udpsink name=udpsink host=127.0.0.1 port=56001";


    let pipe_elem = gst::parse_launch(pipeline_str).unwrap();

    let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();

    let rtph265pay = pipeline.by_name("rtph265pay").unwrap();

    let rtppay_src_pad = rtph265pay.static_pad("src").unwrap();


    let udpsink = pipeline.by_name("udpsink").unwrap();

    let udpsink_sink_pad = udpsink.static_pad("sink").unwrap();



    let rtspsrc = pipeline.by_name("rtspsrc").unwrap();

    let bin_ref = pipeline.clone();

    let h265parse = pipeline.by_name("h265parse").unwrap();
    let h265parse_src_pad = h265parse.static_pad("src").unwrap();
    h265parse_src_pad.add_probe(gst::PadProbeType::BUFFER, |_, _| {
        println!("Buffer flowing through h265parse");
        gst::PadProbeReturn::Ok
    });


    rtspsrc.connect_pad_added(move |_, src_pad| {
        match src_pad.current_caps() {
            Some(caps) => {
                let new_pad_struct = caps.structure(0).expect("Failed to get first structure of caps for audio");
                for i in 0..new_pad_struct.n_fields() {
                    match new_pad_struct.nth_field_name(i).unwrap().as_str() {
                        "media" => {
                            let media_type = new_pad_struct.value("media").unwrap();
                            let field_value = media_type.get::<&str>().unwrap();
                            println!("field_value={}", field_value);
                            if field_value == "video" {
                                bin_ref.debug_to_dot_file(gst::DebugGraphDetails::all(), "PLAYING");
                                rtppay_src_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
                                    println!("probe for rtppay");
                                    if let Some(probe_data) = probe_info.data.as_mut() {
                                        if let gst::PadProbeData::Buffer(ref mut buffer) = probe_data {
                                            let size = buffer.size();
                                            match buffer.pts() {
                                                Some(pts) => {
                                                    println!("ptstime={}", pts.seconds())
                                                },
                                                None => {
                                                    println!("No PTS, cannot get bandwidth")
                                                }
                                            }

                                            let b = buffer.get_mut().unwrap();
                                            let mut rtp_buffer = RTPBuffer::from_buffer_writable(b).unwrap();

                                            let pts = rtp_buffer.buffer().pts().unwrap();
                                            // Convert the PTS to bytes
                                            let pts_bytes = pts.to_be_bytes();
                                            let extension_data = &pts_bytes[..];

                                            let appbits = 5; // Custom application-specific bits
                                            let id = 1; // Custom extension ID
                                            let result = rtp_buffer.add_extension_twobytes_header(appbits, id, extension_data);

                                            if let Err(e) = result {
                                                eprintln!("Failed to add RTP header extension: {:?}", e);
                                            }
                                        }
                                    }
                                    gst::PadProbeReturn::Ok
                                });

                                udpsink_sink_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
                                    println!("pad probe to udpsink");
                                    if let Some(probe_data) = probe_info.data.as_ref() {
                                        if let gst::PadProbeData::Buffer(buffer) = probe_data {
                                            let rtp_buffer = RTPBuffer::from_buffer_readable(buffer).unwrap();
                                            // Check for RTP extension header
                                            if let Some((appbits, extension_data)) = rtp_buffer.extension_twobytes_header(1, 0) { //extension_twobytes_header(1, 0) {
                                                println!("RTP Extension present:");
                                                println!("App bits: {}", appbits);
                                                println!("Extension data: {:?}", extension_data);

                                                // Convert the extension data back to PTS
                                                if extension_data.len() != 0 {
                                                    let mut pts_bytes = [0u8; 8];
                                                    pts_bytes[..4].copy_from_slice(&extension_data[..4]);  // Copy the first 4 bytes
                                                    let pts = u64::from_be_bytes(pts_bytes);
                                                    //let pts = u64::from_be_bytes(extension_data.try_into().unwrap());
                                                    println!("Extracted PTS from RTP extension: {}", pts);
                                                }
                                            } else {
                                                println!("No RTP Extension found");
                                            }
                                            match rtp_buffer.buffer().pts() {
                                                Some(pts) => {
                                                    println!("udpsink buffer.pts={}", pts.seconds());
                                                },
                                                None => {
                                                    println!("No PTS, cannot get bandwidth");
                                                }
                                            }
                                        }
                                    }
                                    gst::PadProbeReturn::Ok
                                }).unwrap();

                            }
                        }
                        _ => {}
                    }
                }
            }
            _ => {}
        }
    });


    let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();

    // Start the pipeline and run the main loop
    let bus = pipeline.bus().unwrap();
    let pipeline_weak = pipeline.downgrade();
    let _watch = bus.add_watch_local(move |_, msg| {
        if let Some(pipeline) = pipeline_weak.upgrade() {
            match msg.view() {
                gst::MessageView::Element(msg) => {
                    println!("msg.name()={}", msg.src().unwrap().name());
                },
                _ => (),
            }
        }
        glib::ControlFlow::Continue
    }).unwrap();


    pipeline.set_state(gst::State::Playing).unwrap();

    let main_loop = glib::MainLoop::new(None, false);
    main_loop.run();
}

Please let me know what I have missed. I appreciate your help!