I am receiving an RTSP stream via gstreamer pipeline in python. The stream has NTP timestamps and for synchronization purposes, I would like to pull e.g. single video frames from the stream and their associated timestamps.
I have a probe function that gives me the current frame buffer - and I can indeed grab single frames from this - but where would I access the NTP timestamps , that are presumably somewhere in the buffer metadata?
I have some sample code below:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon May 20 12:20:15 2024
@author: felix
"""
import gi
import sys
gi.require_version('GstRtspServer', '1.0')
gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst, GstRtspServer
import numpy as np
import matplotlib.pyplot as plt
import os
COUNTER = 0
WORK_FOLDER = "/mnt/e/data/teknoir/gstreamer"
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
def decodebin_child_added(child_proxy, Object, name, user_data):
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if name.find("nvv4l2decoder") != -1:
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
Object.set_property("cudadec-memtype", 2)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property("drop-on-latency") != None:
Object.set_property("drop-on-latency", True)
def cb_newpad(decodebin, decoder_src_pad, data):
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
if gstname.find("video") != -1:
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
print("Failed to link decoder src pad to source bin ghost pad\n")
else:
print(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def create_source_bin(index, uri):
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
print("bin {:} at uri {:}".format(bin_name, uri))
nbin = Gst.Bin.new(bin_name)
if not nbin:
print(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
print(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(
Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
)
if not bin_pad:
print(" Failed to add ghost pad in source bin \n")
return None
return nbin
def appsink_pad_buffer_probe(pad, info, u_data):
global COUNTER,WORK_FOLDER
buffer = info.get_buffer()
if not buffer:
print("Unable to get GstBuffer ")
return
caps = pad.get_current_caps()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
return Gst.PadProbeReturn.DROP
#here we grab a frame from the stream and need to get NTP time from the metadata
h = caps.get_structure(0).get_value("height")
w = caps.get_structure(0).get_value("width")
print(w,
h,
caps.get_structure(0).get_value("format"),
map_info.size,COUNTER)
# Create a numpy array from the buffer data
if COUNTER % 100==0:
numpy_array = np.frombuffer(map_info.data, dtype=np.uint8).reshape(h,w,3)
fname = os.path.join(WORK_FOLDER,"test_{:}.png".format(COUNTER))
plt.imsave(fname,numpy_array)
COUNTER+=1
# Unmap the buffer
buffer.unmap(map_info)
return Gst.PadProbeReturn.OK
# gst-launch-1.0 uridecodebin uri=rtsp://192.168.1.3:8554/stream1 ! autovideoconvert ! video/x-raw ! autovideosink
# Create an RTSP pipeline
Gst.init(None)
pipeline = Gst.Pipeline()
# Create an RTSP source element
#source = Gst.ElementFactory.make("rtspsrc", "source")
#source.set_property("location", "rtsp://192.168.1.3:8554/stream1")
stream = "rtsp://192.168.1.3:8554/stream1"
source= create_source_bin(0, stream)
pipeline.add(source)
# Add the appsink element
sink = Gst.ElementFactory.make('appsink', 'appsink')
pipeline.add(sink)
# Add the videoconvert element
videoconvert = Gst.ElementFactory.make('autovideoconvert', 'autovideoconvert')
pipeline.add(videoconvert)
caps = Gst.Caps.from_string("video/x-raw,format=RGB")# Link the elements together
cfilter = Gst.ElementFactory.make("capsfilter", "caps")
pipeline.add(cfilter)
cfilter.set_property("caps",caps)
source.link(videoconvert)
videoconvert.link(cfilter)
cfilter.link(sink)
sink_pad = sink.get_static_pad("sink")
if sink_pad is not None:
sink_pad.add_probe(Gst.PadProbeType.BUFFER, appsink_pad_buffer_probe, 0)
print("probe added")
else:
print("couldnt get a source pad")
# Start the pipeline
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except BaseException:
print("something went wrong")
# cleanup
pipeline.set_state(Gst.State.NULL)