Issue
I’ve written a pipeline, which extracts a video frame at a given position and passes it to a callback function for further processing.
When I run the sample code below, it starts with roughly 40 MiB
reserved memory.
Once pad-added
is emitted (and before this pad gets linked), the memory allocation rises up to 206 MiB
and stays there even after the cleanup
.
For my test case the decoding pipeline chooses to use the vah264dec
decoder for a 15 seconds, 5 MB, 1920x1080, mp4/h264 video file.
Here’s a log with GST_DEBUG=4
, so that you don’t need to run the script yourself: openSUSE Paste
Expected Result
Once the pipeline finishes and everything got garbage collected, the memory allocation falls off to roughly the initial value before running the pipeline.
What might I be doing wrong?
Simplified sample code
with fakesink
, instead of video_convert
, video_scale
and appsink
and w/o reading the frame buffer and calling the callback function.
You can run it with python sample.py your.mp4
. It will print out the memory usage using the memory_profiler
package.
from __future__ import annotations
import sys
import typing
import logging
from collections.abc import Callable
from pathlib import Path
import gc
import gi
from gi.repository import GLib
from gi.repository import GObject
gi.require_version("Gst", "1.0")
from gi.repository import Gst
from memory_profiler import profile
QUERY_DURATION_MAX_RETRIES = 100
RELATIVE_POSITION = 5 / 100
log = logging.getLogger(__name__)
mainloop = GLib.MainLoop()
class VideoAnalyzer:
def __init__(self) -> None:
self._file_path = None
self._query_duration_retry_count = 0
self._is_frame_extracted = False
self._pipeline = Gst.Pipeline.new("pipeline")
self._query = Gst.Query.new_duration(Gst.Format.TIME)
self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus_watch_id = self._bus.connect("message", self._on_bus_message)
self._uridecodebin = Gst.ElementFactory.make("uridecodebin3")
self._uridecodebin_connect_id = self._uridecodebin.connect("pad-added", self._on_pad_added)
self._uridecodebin.set_property("caps", Gst.Caps.from_string("video/x-h264,video/x-raw(ANY)"))
self._fakesink = Gst.ElementFactory.make("fakesink")
self._fakesink_pad = self._fakesink.get_static_pad("sink")
self._pipeline.add(self._uridecodebin)
self._pipeline.add(self._fakesink)
self._pipeline.set_state(Gst.State.NULL)
@profile
def run(self,
file_path: Path,
callback: Callable[bytes, None]):
self._file_path = file_path
self._uridecodebin.set_property("uri", file_path.as_uri())
self._pipeline.set_state(Gst.State.PAUSED)
@profile
def _cleanup(self) -> None:
log.debug("Cleaning up!")
self._pipeline.set_state(Gst.State.NULL)
self._bus.remove_signal_watch()
self._bus.disconnect(self._bus_watch_id)
self._pipeline.remove(self._uridecodebin)
self._uridecodebin.disconnect(self._uridecodebin_connect_id)
self._pipeline.remove(self._fakesink)
gc.collect()
@profile
def _extract_frame(self) -> None:
log.debug("Extract frame for %s. (Retry count = %s)",
self._file_path, self._query_duration_retry_count)
self._is_frame_extracted = True
self._cleanup()
@profile
def _on_pad_added(self, _element: Gst.Element, pad: Gst.Pad) -> None:
if not self._fakesink_pad.is_linked():
pad.link(self._fakesink_pad)
log.debug("Add pad!")
else:
log.debug("Pad already added!")
def _on_bus_message(self, _bus: Gst.Bus, message: Gst.Message) -> None:
if (message.type == Gst.MessageType.STATE_CHANGED or
message.type == Gst.MessageType.DURATION_CHANGED):
if self._is_frame_extracted:
log.debug("Frame already extracted!")
return
if self._pipeline.query(self._query):
_, duration = self._query.parse_duration()
position = int(duration * RELATIVE_POSITION)
self._pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH,
position)
self._extract_frame()
else:
self._query_duration_retry_count += 1
if (self._query_duration_retry_count
> QUERY_DURATION_MAX_RETRIES):
log.debug(
"Could not extract frame for %s:"
" Querying duration failed!",
self._file_path
)
self._cleanup()
return
if message.type == Gst.MessageType.EOS:
self._cleanup()
return
if message.type == Gst.MessageType.ERROR:
structure = message.get_structure()
if structure is None:
log.error("structure is None!")
return
gerror = structure.get_value("gerror")
if gerror is None:
log.error("gerror is None!")
return
log.error("Error: %s (%s)", gerror.domain, gerror.code)
self._cleanup()
return
#-----------------------------------------------------------------------------------
@profile
def extract_frame():
video_frame_extractor = VideoAnalyzer()
video_frame_extractor.run(Path(sys.argv[1]), callback=None)
def main():
Gst.init(None)
extract_frame()
mainloop.run()
if __name__ == '__main__':
logging.basicConfig(level=logging.NOTSET)
main()
System info
OS: Tumbleweed
Gstreamer: 1.24.10
Python: 3.13