Extracting NTP timestamps from an rtsp stream

I am receiving an RTSP stream via gstreamer pipeline in python. The stream has NTP timestamps and for synchronization purposes, I would like to pull e.g. single video frames from the stream and their associated timestamps.

I have a probe function that gives me the current frame buffer - and I can indeed grab single frames from this - but where would I access the NTP timestamps , that are presumably somewhere in the buffer metadata?

I have some sample code below:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon May 20 12:20:15 2024

@author: felix
"""

import gi
import sys
gi.require_version('GstRtspServer', '1.0')
gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst, GstRtspServer
import numpy as np
import matplotlib.pyplot as plt
import os

COUNTER = 0

WORK_FOLDER  = "/mnt/e/data/teknoir/gstreamer"

def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True


def decodebin_child_added(child_proxy, Object, name, user_data):
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

    if name.find("nvv4l2decoder") != -1:
        # Use CUDA unified memory in the pipeline so frames
        # can be easily accessed on CPU in Python.
        Object.set_property("cudadec-memtype", 2)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property("drop-on-latency") != None:
            Object.set_property("drop-on-latency", True)
            
            
def cb_newpad(decodebin, decoder_src_pad, data):
    caps = decoder_src_pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    if gstname.find("video") != -1:
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                print("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            print(" Error: Decodebin did not pick nvidia decoder plugin.\n")
            
def create_source_bin(index, uri):
    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name = "source-bin-%02d" % index

    print("bin {:} at uri {:}".format(bin_name, uri))
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        print(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        print(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri", uri)

    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(
        Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
    )
    if not bin_pad:
        print(" Failed to add ghost pad in source bin \n")
        return None
    return nbin


def appsink_pad_buffer_probe(pad, info, u_data):
    global COUNTER,WORK_FOLDER
    buffer = info.get_buffer()
    if not buffer:
        print("Unable to get GstBuffer ")
        return
    caps = pad.get_current_caps()
    
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        return Gst.PadProbeReturn.DROP
  
    #here we grab a frame from the stream and need to get NTP time from the metadata
    h = caps.get_structure(0).get_value("height")
    w = caps.get_structure(0).get_value("width")
    print(w,
          h,
          caps.get_structure(0).get_value("format"),
          map_info.size,COUNTER)
    # Create a numpy array from the buffer data
    if COUNTER % 100==0:
        numpy_array = np.frombuffer(map_info.data, dtype=np.uint8).reshape(h,w,3)
        fname = os.path.join(WORK_FOLDER,"test_{:}.png".format(COUNTER))
        plt.imsave(fname,numpy_array)
    
    COUNTER+=1
    # Unmap the buffer
    buffer.unmap(map_info)
    return Gst.PadProbeReturn.OK

    
# gst-launch-1.0 uridecodebin uri=rtsp://192.168.1.3:8554/stream1 ! autovideoconvert ! video/x-raw ! autovideosink
# Create an RTSP pipeline
Gst.init(None)
pipeline = Gst.Pipeline()

# Create an RTSP source element
#source = Gst.ElementFactory.make("rtspsrc", "source")
#source.set_property("location", "rtsp://192.168.1.3:8554/stream1")

stream = "rtsp://192.168.1.3:8554/stream1"
source= create_source_bin(0, stream)
pipeline.add(source)


# Add the appsink element
sink = Gst.ElementFactory.make('appsink', 'appsink')
pipeline.add(sink)


# Add the videoconvert element
videoconvert = Gst.ElementFactory.make('autovideoconvert', 'autovideoconvert')
pipeline.add(videoconvert)
caps = Gst.Caps.from_string("video/x-raw,format=RGB")# Link the elements together
cfilter = Gst.ElementFactory.make("capsfilter", "caps")
pipeline.add(cfilter)
cfilter.set_property("caps",caps)
source.link(videoconvert)
videoconvert.link(cfilter)
cfilter.link(sink)

sink_pad = sink.get_static_pad("sink")

if sink_pad is not None:
    sink_pad.add_probe(Gst.PadProbeType.BUFFER, appsink_pad_buffer_probe, 0)
    print("probe added")
else:
    print("couldnt get a source pad")

# Start the pipeline
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
pipeline.set_state(Gst.State.PLAYING)

try:
    loop.run()
except BaseException:
    print("something went wrong")
# cleanup
pipeline.set_state(Gst.State.NULL)

Enable the add-reference-timestamp-meta property on rtspsrc. Then you get the NTP timestamps as GstReferenceTimestampMeta on the buffers coming out of rtspsrc, and they’re forwarded (unless there are bugs in elements in between) until the sink.

Note that the meta will only be added once the NTP timestamps are actually known. E.g. once an RTP packet with the NTP-64 header extension is received, or an RTCP SR is received.

Yes, I ran across GstReferenceTimestampMeta, which is very likely what I want, since I know I have the timestamps in the stream - but is there documentation on the python bindings somewhere? All I could find was this PyGObject API Reference , which doesnt even show this particular reference on search.