Issue
I’ve written a pipeline, which extracts a video frame at a given position and passes it to a callback function for further processing.
When I run the sample code below, it starts with roughly 40 MiB reserved memory.
Once pad-added is emitted (and before this pad gets linked), the memory allocation rises up to 206 MiB and stays there even after the cleanup.
For my test case the decoding pipeline chooses to use the vah264dec decoder for a 15 seconds, 5 MB, 1920x1080, mp4/h264 video file.
Here’s a log with GST_DEBUG=4, so that you don’t need to run the script yourself: openSUSE Paste
Expected Result
Once the pipeline finishes and everything got garbage collected, the memory allocation falls off to roughly the initial value before running the pipeline.
What might I be doing wrong? ![]()
Simplified sample code
with fakesink, instead of video_convert, video_scale and appsink and w/o reading the frame buffer and calling the callback function.
You can run it with python sample.py your.mp4. It will print out the memory usage using the memory_profiler package.
from __future__ import annotations
import sys
import typing
import logging
from collections.abc import Callable
from pathlib import Path
import gc
import gi
from gi.repository import GLib
from gi.repository import GObject
gi.require_version("Gst", "1.0")
from gi.repository import Gst
from memory_profiler import profile
QUERY_DURATION_MAX_RETRIES = 100
RELATIVE_POSITION = 5 / 100
log = logging.getLogger(__name__)
mainloop = GLib.MainLoop()
class VideoAnalyzer:
def __init__(self) -> None:
self._file_path = None
self._query_duration_retry_count = 0
self._is_frame_extracted = False
self._pipeline = Gst.Pipeline.new("pipeline")
self._query = Gst.Query.new_duration(Gst.Format.TIME)
self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus_watch_id = self._bus.connect("message", self._on_bus_message)
self._uridecodebin = Gst.ElementFactory.make("uridecodebin3")
self._uridecodebin_connect_id = self._uridecodebin.connect("pad-added", self._on_pad_added)
self._uridecodebin.set_property("caps", Gst.Caps.from_string("video/x-h264,video/x-raw(ANY)"))
self._fakesink = Gst.ElementFactory.make("fakesink")
self._fakesink_pad = self._fakesink.get_static_pad("sink")
self._pipeline.add(self._uridecodebin)
self._pipeline.add(self._fakesink)
self._pipeline.set_state(Gst.State.NULL)
@profile
def run(self,
file_path: Path,
callback: Callable[bytes, None]):
self._file_path = file_path
self._uridecodebin.set_property("uri", file_path.as_uri())
self._pipeline.set_state(Gst.State.PAUSED)
@profile
def _cleanup(self) -> None:
log.debug("Cleaning up!")
self._pipeline.set_state(Gst.State.NULL)
self._bus.remove_signal_watch()
self._bus.disconnect(self._bus_watch_id)
self._pipeline.remove(self._uridecodebin)
self._uridecodebin.disconnect(self._uridecodebin_connect_id)
self._pipeline.remove(self._fakesink)
gc.collect()
@profile
def _extract_frame(self) -> None:
log.debug("Extract frame for %s. (Retry count = %s)",
self._file_path, self._query_duration_retry_count)
self._is_frame_extracted = True
self._cleanup()
@profile
def _on_pad_added(self, _element: Gst.Element, pad: Gst.Pad) -> None:
if not self._fakesink_pad.is_linked():
pad.link(self._fakesink_pad)
log.debug("Add pad!")
else:
log.debug("Pad already added!")
def _on_bus_message(self, _bus: Gst.Bus, message: Gst.Message) -> None:
if (message.type == Gst.MessageType.STATE_CHANGED or
message.type == Gst.MessageType.DURATION_CHANGED):
if self._is_frame_extracted:
log.debug("Frame already extracted!")
return
if self._pipeline.query(self._query):
_, duration = self._query.parse_duration()
position = int(duration * RELATIVE_POSITION)
self._pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH,
position)
self._extract_frame()
else:
self._query_duration_retry_count += 1
if (self._query_duration_retry_count
> QUERY_DURATION_MAX_RETRIES):
log.debug(
"Could not extract frame for %s:"
" Querying duration failed!",
self._file_path
)
self._cleanup()
return
if message.type == Gst.MessageType.EOS:
self._cleanup()
return
if message.type == Gst.MessageType.ERROR:
structure = message.get_structure()
if structure is None:
log.error("structure is None!")
return
gerror = structure.get_value("gerror")
if gerror is None:
log.error("gerror is None!")
return
log.error("Error: %s (%s)", gerror.domain, gerror.code)
self._cleanup()
return
#-----------------------------------------------------------------------------------
@profile
def extract_frame():
video_frame_extractor = VideoAnalyzer()
video_frame_extractor.run(Path(sys.argv[1]), callback=None)
def main():
Gst.init(None)
extract_frame()
mainloop.run()
if __name__ == '__main__':
logging.basicConfig(level=logging.NOTSET)
main()
System info
OS: Tumbleweed
Gstreamer: 1.24.10
Python: 3.13