I am trying to add some RTP header extension values to the outgoing RTP packets on a udp port. So I added a add_probe() method to the rtph265pay
element. However I dont see the probes being called at all.
Here is a simplified version of my pipeline
"rtspsrc location=<myrtspurl> name=rtspsrc latency=100 \
rtspsrc. ! rtph265depay ! tee name=t \
t. ! queue ! h265parse ! rtph265pay name=rtph265pay config-interval=-1 \
! udpsink name=udpsink host=127.0.0.1 port=56001";
I verified that I am able to play the packets on a receiving pipeline. So the data is getting passed through. But I dont understand why I am not getting callbacks on add_probe().
I tried the exact same code on a rtsp stream sending H264 with rtph264pay
. So it seems to be a behavior only with H265.
Another observation is that when I add a add_probe() to the udpsink
sink element which is after rtph265pay. I dont get a callback there as well.
I have implemented this in rust and here is the way I have added the callback.
use gst::prelude::*;
use gst_rtp::RTPBuffer;
use anyhow::Error;
use gst::ClockTime;
// Inside main
fn main() {
gst::init().unwrap();
let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.249:554/ name=rtspsrc latency=100 \
rtspsrc. ! rtph265depay ! tee name=t \
t. ! queue ! h265parse name=h265parse ! rtph265pay name=rtph265pay config-interval=0 ! udpsink name=udpsink host=127.0.0.1 port=56001";
// This pipeline with h264 with the same code below will get add_probe callbacks.
// let pipeline_str = "rtspsrc location=rtsp://admin:mypassword@10.0.0.251:554/ name=rtspsrc latency=100 \
// rtspsrc. ! rtph264depay ! tee name=t \
// t. ! queue ! h264parse ! rtph264pay name=rtph265pay config-interval=-1 ! udpsink name=udpsink host=127.0.0.1 port=56001";
let pipe_elem = gst::parse_launch(pipeline_str).unwrap();
let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();
let rtph265pay = pipeline.by_name("rtph265pay").unwrap();
let rtppay_src_pad = rtph265pay.static_pad("src").unwrap();
let udpsink = pipeline.by_name("udpsink").unwrap();
let udpsink_sink_pad = udpsink.static_pad("sink").unwrap();
let rtspsrc = pipeline.by_name("rtspsrc").unwrap();
let bin_ref = pipeline.clone();
let h265parse = pipeline.by_name("h265parse").unwrap();
let h265parse_src_pad = h265parse.static_pad("src").unwrap();
h265parse_src_pad.add_probe(gst::PadProbeType::BUFFER, |_, _| {
println!("Buffer flowing through h265parse");
gst::PadProbeReturn::Ok
});
rtspsrc.connect_pad_added(move |_, src_pad| {
match src_pad.current_caps() {
Some(caps) => {
let new_pad_struct = caps.structure(0).expect("Failed to get first structure of caps for audio");
for i in 0..new_pad_struct.n_fields() {
match new_pad_struct.nth_field_name(i).unwrap().as_str() {
"media" => {
let media_type = new_pad_struct.value("media").unwrap();
let field_value = media_type.get::<&str>().unwrap();
println!("field_value={}", field_value);
if field_value == "video" {
bin_ref.debug_to_dot_file(gst::DebugGraphDetails::all(), "PLAYING");
rtppay_src_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
println!("probe for rtppay");
if let Some(probe_data) = probe_info.data.as_mut() {
if let gst::PadProbeData::Buffer(ref mut buffer) = probe_data {
let size = buffer.size();
match buffer.pts() {
Some(pts) => {
println!("ptstime={}", pts.seconds())
},
None => {
println!("No PTS, cannot get bandwidth")
}
}
let b = buffer.get_mut().unwrap();
let mut rtp_buffer = RTPBuffer::from_buffer_writable(b).unwrap();
let pts = rtp_buffer.buffer().pts().unwrap();
// Convert the PTS to bytes
let pts_bytes = pts.to_be_bytes();
let extension_data = &pts_bytes[..];
let appbits = 5; // Custom application-specific bits
let id = 1; // Custom extension ID
let result = rtp_buffer.add_extension_twobytes_header(appbits, id, extension_data);
if let Err(e) = result {
eprintln!("Failed to add RTP header extension: {:?}", e);
}
}
}
gst::PadProbeReturn::Ok
});
udpsink_sink_pad.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
println!("pad probe to udpsink");
if let Some(probe_data) = probe_info.data.as_ref() {
if let gst::PadProbeData::Buffer(buffer) = probe_data {
let rtp_buffer = RTPBuffer::from_buffer_readable(buffer).unwrap();
// Check for RTP extension header
if let Some((appbits, extension_data)) = rtp_buffer.extension_twobytes_header(1, 0) { //extension_twobytes_header(1, 0) {
println!("RTP Extension present:");
println!("App bits: {}", appbits);
println!("Extension data: {:?}", extension_data);
// Convert the extension data back to PTS
if extension_data.len() != 0 {
let mut pts_bytes = [0u8; 8];
pts_bytes[..4].copy_from_slice(&extension_data[..4]); // Copy the first 4 bytes
let pts = u64::from_be_bytes(pts_bytes);
//let pts = u64::from_be_bytes(extension_data.try_into().unwrap());
println!("Extracted PTS from RTP extension: {}", pts);
}
} else {
println!("No RTP Extension found");
}
match rtp_buffer.buffer().pts() {
Some(pts) => {
println!("udpsink buffer.pts={}", pts.seconds());
},
None => {
println!("No PTS, cannot get bandwidth");
}
}
}
}
gst::PadProbeReturn::Ok
}).unwrap();
}
}
_ => {}
}
}
}
_ => {}
}
});
let pipeline = pipe_elem.clone().downcast::<gst::Pipeline>().unwrap();
// Start the pipeline and run the main loop
let bus = pipeline.bus().unwrap();
let pipeline_weak = pipeline.downgrade();
let _watch = bus.add_watch_local(move |_, msg| {
if let Some(pipeline) = pipeline_weak.upgrade() {
match msg.view() {
gst::MessageView::Element(msg) => {
println!("msg.name()={}", msg.src().unwrap().name());
},
_ => (),
}
}
glib::ControlFlow::Continue
}).unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
let main_loop = glib::MainLoop::new(None, false);
main_loop.run();
}
Please let me know what I have missed. I appreciate your help!