Uridecodebin3: Memory not freed in the end after decoding

Issue

I’ve written a pipeline, which extracts a video frame at a given position and passes it to a callback function for further processing.

When I run the sample code below, it starts with roughly 40 MiB reserved memory.
Once pad-added is emitted (and before this pad gets linked), the memory allocation rises up to 206 MiB and stays there even after the cleanup.

For my test case the decoding pipeline chooses to use the vah264dec decoder for a 15 seconds, 5 MB, 1920x1080, mp4/h264 video file.

Here’s a log with GST_DEBUG=4, so that you don’t need to run the script yourself: openSUSE Paste

Expected Result

Once the pipeline finishes and everything got garbage collected, the memory allocation falls off to roughly the initial value before running the pipeline.

What might I be doing wrong? :slight_smile:

Simplified sample code

with fakesink, instead of video_convert, video_scale and appsink and w/o reading the frame buffer and calling the callback function.

You can run it with python sample.py your.mp4. It will print out the memory usage using the memory_profiler package.

from __future__ import annotations

import sys
import typing

import logging
from collections.abc import Callable
from pathlib import Path
import gc

import gi
from gi.repository import GLib
from gi.repository import GObject
gi.require_version("Gst", "1.0")
from gi.repository import Gst

from memory_profiler import profile

QUERY_DURATION_MAX_RETRIES = 100
RELATIVE_POSITION = 5 / 100

log = logging.getLogger(__name__)
mainloop = GLib.MainLoop()


class VideoAnalyzer:
    def __init__(self) -> None:

        self._file_path = None
        self._query_duration_retry_count = 0
        self._is_frame_extracted = False

        self._pipeline = Gst.Pipeline.new("pipeline")
        self._query = Gst.Query.new_duration(Gst.Format.TIME)

        self._bus = self._pipeline.get_bus()
        self._bus.add_signal_watch()
        self._bus_watch_id = self._bus.connect("message", self._on_bus_message)

        self._uridecodebin = Gst.ElementFactory.make("uridecodebin3")
        self._uridecodebin_connect_id = self._uridecodebin.connect("pad-added", self._on_pad_added)
        self._uridecodebin.set_property("caps", Gst.Caps.from_string("video/x-h264,video/x-raw(ANY)"))

        self._fakesink = Gst.ElementFactory.make("fakesink")
        self._fakesink_pad = self._fakesink.get_static_pad("sink")

        self._pipeline.add(self._uridecodebin)
        self._pipeline.add(self._fakesink)

        self._pipeline.set_state(Gst.State.NULL)


    @profile
    def run(self,
            file_path: Path,
            callback: Callable[bytes, None]):

        self._file_path = file_path
        self._uridecodebin.set_property("uri", file_path.as_uri())

        self._pipeline.set_state(Gst.State.PAUSED)


    @profile
    def _cleanup(self) -> None:
        log.debug("Cleaning up!")

        self._pipeline.set_state(Gst.State.NULL)
        self._bus.remove_signal_watch()
        self._bus.disconnect(self._bus_watch_id)

        self._pipeline.remove(self._uridecodebin)
        self._uridecodebin.disconnect(self._uridecodebin_connect_id)
        self._pipeline.remove(self._fakesink)

        gc.collect()    

    @profile
    def _extract_frame(self) -> None:
        log.debug("Extract frame for %s. (Retry count = %s)",
                   self._file_path, self._query_duration_retry_count)

        self._is_frame_extracted = True
        self._cleanup()

    @profile
    def _on_pad_added(self, _element: Gst.Element, pad: Gst.Pad) -> None:
        if not self._fakesink_pad.is_linked():
            pad.link(self._fakesink_pad)
            log.debug("Add pad!")
        else:
            log.debug("Pad already added!")

    def _on_bus_message(self, _bus: Gst.Bus, message: Gst.Message) -> None:
        if (message.type == Gst.MessageType.STATE_CHANGED or
            message.type == Gst.MessageType.DURATION_CHANGED):

            if self._is_frame_extracted:
                log.debug("Frame already extracted!")
                return

            if self._pipeline.query(self._query):
                _, duration = self._query.parse_duration()
                position = int(duration * RELATIVE_POSITION)
                self._pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH,
                                           position)
                self._extract_frame()
            else:
                self._query_duration_retry_count += 1
                if (self._query_duration_retry_count
                        > QUERY_DURATION_MAX_RETRIES):
                    log.debug(
                        "Could not extract frame for %s:"
                        " Querying duration failed!",
                        self._file_path
                    )
                    self._cleanup()
            return
  
        if message.type == Gst.MessageType.EOS:
            self._cleanup()
            return

        if message.type == Gst.MessageType.ERROR:
            structure = message.get_structure()
            if structure is None:
                log.error("structure is None!")
                return

            gerror = structure.get_value("gerror")
            if gerror is None:
                log.error("gerror is None!")
                return

            log.error("Error: %s (%s)", gerror.domain, gerror.code)
            self._cleanup()
            return

#-----------------------------------------------------------------------------------

@profile
def extract_frame():
    video_frame_extractor = VideoAnalyzer()
    video_frame_extractor.run(Path(sys.argv[1]), callback=None)


def main():
    Gst.init(None)
    extract_frame()
    mainloop.run()


if __name__ == '__main__':
    logging.basicConfig(level=logging.NOTSET)
    main()

System info

OS: Tumbleweed
Gstreamer: 1.24.10
Python: 3.13

I can no longer edit my first post unfortunately, but I have slimmed down the example code to get rid of redundant things:

from __future__ import annotations

import gc
import logging
import sys
from collections.abc import Callable
from pathlib import Path

import gi
from gi.repository import GLib

gi.require_version("Gst", "1.0")
from gi.repository import Gst
from memory_profiler import profile

log = logging.getLogger(__name__)
mainloop = GLib.MainLoop()

class VideoAnalyzer:
    def __init__(self) -> None:

        self._file_path = None

        self._pipeline = Gst.Pipeline.new("pipeline")
        self._query = Gst.Query.new_duration(Gst.Format.TIME)

        self._bus = self._pipeline.get_bus()
        self._bus.add_signal_watch()
        self._bus_watch_id = self._bus.connect("message", self._on_bus_message)

        self._uridecodebin = Gst.ElementFactory.make("uridecodebin3")
        self._uridecodebin_connect_id = self._uridecodebin.connect("pad-added",
                                                                   self._on_pad_added)
        self._uridecodebin.set_property("caps", Gst.Caps.from_string(
            "video/x-h264,video/x-raw(ANY)"))

        self._fakesink = Gst.ElementFactory.make("fakesink")
        self._fakesink_pad = self._fakesink.get_static_pad("sink")

        self._pipeline.add(self._uridecodebin)
        self._pipeline.add(self._fakesink)


    @profile
    def run(self, file_path: Path) -> None:
        self._file_path = file_path
        self._uridecodebin.set_property("uri", file_path.as_uri())
        self._pipeline.set_state(Gst.State.PAUSED)


    @profile
    def _cleanup(self) -> None:
        log.debug("Cleaning up!")

        self._pipeline.set_state(Gst.State.NULL)
        self._pipeline.run_dispose()

        self._bus.remove_signal_watch()
        self._bus.disconnect(self._bus_watch_id)
        self._bus.run_dispose()

        self._uridecodebin.disconnect(self._uridecodebin_connect_id)

        gc.collect()


    @profile
    def _on_pad_added(self, element: Gst.Element, pad: Gst.Pad) -> None:
        if not self._fakesink_pad.is_linked():
            pad.link(self._fakesink_pad)
            log.debug("Add pad %s to %s!", pad, element)


    def _on_bus_message(self, _bus: Gst.Bus, message: Gst.Message) -> None:
        if message.type == Gst.MessageType.STATE_CHANGED:
            state = self._pipeline.get_state(40)[1]
            if state == Gst.State.PAUSED:
                self._cleanup()

# -----------------------------------------------------------------------------------

@profile
def extract_frame():
    video_frame_extractor = VideoAnalyzer()
    video_frame_extractor.run(Path(sys.argv[1]))


def main():
    Gst.init(None)
    extract_frame()
    mainloop.run()


if __name__ == '__main__':
    logging.basicConfig(level=logging.NOTSET)
    main()

How do you measure memory usage? Keep in mind the RES memory is not released directly back from your process to the kernel when it’s freed, so looking at that in ps / top doesn’t tell you much.

Hi! That’s exactly how I’ve measured it, besides using this memory_profile python package, which essentially shows the same numbers.

Is there a way to tell, when the RES memory will be released? What would be a suitable measurement? Should I be concerned at all? :slight_smile:

I plan to integrate a video thumbnail generator into an instant messaging and I fear some may complain that the application starts to require more memory than before.

I can’t remember the details but it was not simple. If you search for this on the Internet you’ll find many articles about exactly this “problem”.

You could use something like heaptrack for example. That shows you memory usage over time, where it comes from, leaks, etc… It works by actually tracking allocations and deallocations.

I don’t know if you should be concerned :slight_smile: It could very well be a leak, memory fragmentation, or completely normal. You’ll have to check first where the memory usage comes from and if it’s indeed not freed.