How to debug possible memory leak

Hello,
I’m trying to make a simple pipeline here a playlist of video files would run one after the other. I use a uridecodebin3 element and pass it through the pipeline stages until it reaches an rtmp sink. After the file finishes playing, I remove the uridecodebin3, create a new one and attach it to the pipeline.
After 7 hours of playtime, I noticed that the memory usages of the process increased from averaging around 500mb to 2.5GB. Not sure if i’m doing something wrong or if there’s a leak somewhere

Here’s the source code

#include <gst/gst.h>
#include <stdio.h>
#include <string.h>

#define MAX_FILES 2

typedef struct
{
    GstElement *elements[6];
    GstElement *decodebin;
    GstElement *audioqueue;
    GstElement *videoqueue;
    GstPad *ac_sink_pad;
    GstPad *vc_sink_pad;
} QueueFileResult;

typedef struct
{
    GstElement *pipeline;
    GstElement *source;
    QueueFileResult *queue_result;

    const char *files[MAX_FILES];
    int current_file_index;
} StreamContext;

typedef struct
{
    GstElement *mux
} OnMQPadAddedData;

typedef struct
{
    GstElement *videoqueue;
    GstElement *audioqueue;
    GstPipeline *pipeline;
    char *id;
} DecodePadAddedData;

typedef struct
{
    char *id;
    GstPipeline *pipeline;
    QueueFileResult queue_result;
} AboutToFinishData;

void rand_str(char *dest, size_t length)
{
    char charset[] =
        "abcdefghijklmnopqrstuvwxyz"
        "ABCDEFGHIJKLMNOPQRSTUVWXYZ";

    while (length-- > 0)
    {
        size_t index = (double)rand() / RAND_MAX * (sizeof charset - 1);
        *dest++ = charset[index];
    }
    *dest = '\0';
}

static void on_decoder_pad_added(GstElement *element, GstPad *pad, gpointer data);
static void on_about_to_finish(GstElement *element, gpointer data);

static QueueFileResult queue_next(char *id, char *url, GstPipeline *pipeline, GstPad *vc_sink_pad, GstPad *ac_sink_pad)
{
    QueueFileResult result;
    result.ac_sink_pad = ac_sink_pad;
    result.vc_sink_pad = vc_sink_pad;

    GstElement *uridecodebin = gst_element_factory_make("uridecodebin3", NULL);
    g_object_set(G_OBJECT(uridecodebin), "uri", url, NULL);
    gst_bin_add(GST_BIN(pipeline), uridecodebin);
    result.decodebin = uridecodebin;

    GstElement *videoqueue = gst_element_factory_make("queue2", NULL);
    GstElement *videoconvert = gst_element_factory_make("videoconvert", NULL);
    gst_bin_add_many(GST_BIN(pipeline), videoqueue, videoconvert, NULL);

    gst_element_link(videoqueue, videoconvert);

    GstElement *audioqueue = gst_element_factory_make("queue2", NULL);
    GstElement *audioconvert = gst_element_factory_make("audioconvert", NULL);
    GstElement *audioresample = gst_element_factory_make("audioresample", NULL);

    gst_bin_add_many(GST_BIN(pipeline), audioqueue, audioconvert, audioresample, NULL);
    gst_element_link_many(audioqueue, audioconvert, audioresample, NULL);

    GstPad *audioresample_src_pad = gst_element_get_static_pad(audioresample, "src");
    GstPad *videoconvert_src_pad = gst_element_get_static_pad(videoconvert, "src");

    gst_pad_link(videoconvert_src_pad, vc_sink_pad);
    gst_pad_link(audioresample_src_pad, ac_sink_pad);

    gst_object_unref(videoconvert_src_pad);
    gst_object_unref(audioresample_src_pad);

    result.audioqueue = audioqueue;
    result.videoqueue = videoqueue;
    GstElement *elements[6] = {
        videoqueue,
        videoconvert,
        audioqueue,
        audioconvert,
        audioresample,
        uridecodebin};
    // Copy each element individually
    for (int i = 0; i < 6; i++)
    {
        result.elements[i] = elements[i];
    }

    return result;
}

static void on_about_to_finish(GstElement *element, gpointer data)
{
    AboutToFinishData *context = (AboutToFinishData *)data;

    GstBus *bus = gst_pipeline_get_bus(context->pipeline);

    GstMessage *msg = gst_message_new_application(GST_OBJECT(element), gst_structure_new_from_string(context->id));

    gst_bus_post(bus, msg);

    // TODO: Do we need to unref the message and structure?
    gst_object_unref(bus);
}

static void on_decoder_pad_added(GstElement *element, GstPad *src_pad, gpointer data)
{
    DecodePadAddedData *context = (DecodePadAddedData *)data;
    GstCaps *caps;
    GstStructure *str;

    // TODO: Do we get current caps or query?
    caps = gst_pad_query_caps(src_pad, NULL);
    if (!caps)
    {
        g_printerr("Failed to get caps\n");
        return;
    }

    str = gst_caps_get_structure(caps, 0);
    g_print("\n========================== Got caps: %s\n", gst_structure_get_name(str));
    if (gst_structure_has_name(str, "video/x-raw"))
    {
        g_printerr("================= Before getting static pad\n");
        GstPad *videoqueue_sink_pad = gst_element_get_static_pad(context->videoqueue, "sink");
        g_printerr("================= Got static pad\n");
        if (gst_pad_link(src_pad, videoqueue_sink_pad) != GST_PAD_LINK_OK)
        {
            g_printerr("Failed to link video pad");
        }
        else
        {
            // gst_object_unref(context->videoqueue_sink_pad);
        }
        gst_object_unref(videoqueue_sink_pad);
        gst_debug_bin_to_dot_file(GST_BIN(context->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example_video");
    }
    else if (gst_structure_has_name(str, "audio/x-raw"))
    {
        g_printerr("================= Before getting static pad\n");
        GstPad *audioqueue_sink_pad = gst_element_get_static_pad(context->audioqueue, "sink");
        g_printerr("================= Got static pad\n");
        if (gst_pad_link(src_pad, audioqueue_sink_pad) != GST_PAD_LINK_OK)
        {
            g_printerr("Failed to link audio pad");
        }
        else
        {
            // gst_object_unref(context->audioqueue_sink_pad);
        }
        gst_object_unref(audioqueue_sink_pad);
        gst_debug_bin_to_dot_file(GST_BIN(context->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example_audio");
    }

    gst_caps_unref(caps);
}

static void on_mq_pad_added(GstElement *element, GstPad *pad, gpointer data)
{
    OnMQPadAddedData *context = (OnMQPadAddedData *)data;
    gchar *pad_name = gst_object_get_name(GST_OBJECT(pad));

    // Video sink pad is requested first
    // So we get the video at src_0
    if (g_strcmp0(pad_name, "src_0") == 0)
    {
        g_print("==== MQ: Pad added %s\n", pad_name);
        GstPad *mux_video_sink_pad = gst_element_request_pad_simple(context->mux, "video");
        g_print("Pad name is %s\n", gst_pad_get_name(mux_video_sink_pad));
        if (gst_pad_link(pad, mux_video_sink_pad) != GST_PAD_LINK_OK)
        {
            g_printerr("MQ: Error linking src_0");
        }
        gst_object_unref(mux_video_sink_pad);
    }
    else if (g_strcmp0(pad_name, "src_1") == 0)
    {
        g_print("==== MQ: Pad added %s\n", pad_name);
        // Audio pad
        GstPad *mux_audio_sink_pad = gst_element_request_pad_simple(context->mux, "audio");
        g_print("Pad name is %s\n", gst_pad_get_name(mux_audio_sink_pad));
        if (gst_pad_link(pad, mux_audio_sink_pad) != GST_PAD_LINK_OK)
        {
            g_printerr("MQ: Error linking src_1");
        }
        gst_object_unref(mux_audio_sink_pad);
    }
}

int main(int argc, char *argv[])
{
    char *VIDEO_FILES[MAX_FILES] = {
        "file:///rs-app/vid2.webm",
        "file:///rs-app/vert.mp4",
    };
    // Initialize GStreamer
    printf("==================================== Beefore INIT");
    gst_init(&argc, &argv);
    printf("Initilized");

    // Hardcode RTMP destination
    const char *rtmp_url = "rtmp://localhost:1936/live/live";

    GstPipeline *pipeline = (GstPipeline *)gst_pipeline_new("custom-pipeline");

    GstElement *audioqueue = gst_element_factory_make("queue2", NULL);
    GstElement *audiomixer = gst_element_factory_make_full("audiomixer", "force-live", TRUE, "ignore-inactive-pads", TRUE, "latency", 100000000, "discont-wait", 0, NULL);

    GstElement *ac = gst_element_factory_make("concat", NULL);
    GstElement *vc = gst_element_factory_make("concat", NULL);

    GstElement *mux = gst_element_factory_make("flvmux", NULL);
    g_object_set(GST_OBJECT(mux), "streamable", TRUE, "enforce-increasing-timestamps", TRUE, NULL);

    GstElement *mq = gst_element_factory_make("multiqueue", NULL);
    g_object_set(GST_OBJECT(mq), "sync-by-running-time", TRUE, "use-interleave", TRUE, "max-size-time", 666666668, "min-interleave-time", 666666668, "max-size-buffers", 0, NULL);

    GstElement *compositor = gst_element_factory_make_full("compositor",
                                                           "force-live",
                                                           TRUE,
                                                           "ignore-inactive-pads",
                                                           TRUE,
                                                           "latency",
                                                           100000000, NULL);

    GstElement *compositor_queue = gst_element_factory_make("queue2", NULL);
    GstElement *compositor_videoconvert = gst_element_factory_make("videoconvert", NULL);
    GstElement *compositor_videoscale = gst_element_factory_make("videoscale", NULL);
    GstElement *compositor_videoscale_filter = gst_element_factory_make("capsfilter", NULL);
    GstCaps *compositor_videoscale_caps = gst_caps_from_string("video/x-raw, width=1920,height=1080");
    g_object_set(GST_OBJECT(compositor_videoscale_filter), "caps", compositor_videoscale_caps, NULL);
    gst_caps_unref(compositor_videoscale_caps);

    GstElement *x264enc = gst_element_factory_make("x264enc", NULL);
    g_object_set(GST_OBJECT(x264enc), "bitrate", 10000, "tune", 4, "speed-preset", 1, "key-int-max", 30, NULL); // TODO: Check tune and speed-preset enums

    GstElement *avenc_aac = gst_element_factory_make("avenc_aac", NULL);
    GstElement *aacparse = gst_element_factory_make("aacparse", NULL);
    GstElement *sink_queue = gst_element_factory_make("queue2", NULL);
    GstElement *sink = gst_element_factory_make("rtmp2sink", NULL);
    g_object_set(GST_OBJECT(sink), "location", rtmp_url, NULL);

    gst_bin_add_many(GST_BIN(pipeline), audioqueue, audiomixer, ac, vc, mux, mq, compositor, compositor_queue, compositor_videoconvert, compositor_videoscale, compositor_videoscale_filter, x264enc, avenc_aac, aacparse, sink_queue, sink, NULL);

    OnMQPadAddedData onMQPadAddedData;
    onMQPadAddedData.mux = mux;
    g_signal_connect(mq, "pad-added", G_CALLBACK(on_mq_pad_added), &onMQPadAddedData);

    // Video tail
    gst_element_link(vc, compositor_queue);
    gst_element_link_many(compositor, compositor_videoconvert, compositor_videoscale, compositor_videoscale_filter, x264enc, NULL);

    GstPad *compositor_queue_src_pad = gst_element_get_static_pad(compositor_queue, "src");
    GstPad *compositor_sink_pad = gst_element_request_pad_simple(compositor, "sink_%u");
    g_object_set(GST_OBJECT(compositor_sink_pad), "width", 1920, "height", 1080, NULL);
    gst_pad_link(compositor_queue_src_pad, compositor_sink_pad);

    gst_object_unref(compositor_queue_src_pad);
    gst_object_unref(compositor_sink_pad);

    GstPad *mq_video_sink_pad = gst_element_request_pad_simple(mq, "sink_%u");
    GstPad *x264enc_src_pad = gst_element_get_static_pad(x264enc, "src");
    gst_pad_link(x264enc_src_pad, mq_video_sink_pad);
    gst_object_unref(mq_video_sink_pad);
    gst_object_unref(x264enc_src_pad);

    // Audio tail
    GstPad *ac_src_pad = gst_element_get_static_pad(ac, "src");
    GstPad *audioqueue_sink_pad = gst_element_get_static_pad(audioqueue, "sink");
    gst_pad_link(ac_src_pad, audioqueue_sink_pad);

    gst_object_unref(ac_src_pad);
    gst_object_unref(audioqueue_sink_pad);

    GstPad *audioqueue_src_pad = gst_element_get_static_pad(audioqueue, "src");
    GstPad *audiomixer_sink_pad = gst_element_request_pad_simple(audiomixer, "sink_%u");
    gst_pad_link(audioqueue_src_pad, audiomixer_sink_pad);

    gst_object_unref(audioqueue_src_pad);
    gst_object_unref(audiomixer_sink_pad);

    gst_element_link_many(audiomixer, avenc_aac, aacparse, NULL);

    GstPad *mq_audio_sink_pad = gst_element_request_pad_simple(mq, "sink_%u");
    GstPad *aacparse_src_pad = gst_element_get_static_pad(aacparse, "src");
    gst_pad_link(aacparse_src_pad, mq_audio_sink_pad);

    gst_object_unref(mq_audio_sink_pad);
    gst_object_unref(aacparse_src_pad);

    gst_element_link_many(mux, sink_queue, sink, NULL);

    GstPad *ac_sink_pad = gst_element_request_pad_simple(ac, "sink_%u");
    GstPad *vc_sink_pad = gst_element_request_pad_simple(vc, "sink_%u");
    printf("Constructed pipeline");

    char id[] = {[8] = '\1'};
    rand_str(id, sizeof id);

    QueueFileResult result = queue_next(id, VIDEO_FILES[0], pipeline, vc_sink_pad, ac_sink_pad);

    DecodePadAddedData decodePadAddedData;
    decodePadAddedData.id = "adsf";
    decodePadAddedData.audioqueue = result.audioqueue;
    decodePadAddedData.videoqueue = result.videoqueue;
    decodePadAddedData.pipeline = pipeline;
    g_signal_connect(result.decodebin, "pad-added",
                     G_CALLBACK(on_decoder_pad_added), &decodePadAddedData);

    AboutToFinishData aboutToFinishData;
    aboutToFinishData.id = id;
    aboutToFinishData.pipeline = pipeline;
    aboutToFinishData.queue_result = result;
    g_signal_connect(result.decodebin, "about-to-finish",
                     G_CALLBACK(on_about_to_finish), &aboutToFinishData);

    for (int i = 0; i < 6; i++)
    {
        gst_element_sync_state_with_parent(result.elements[i]);
    }

    StreamContext context;
    context.queue_result = &result;
    context.pipeline = pipeline;
    // Get the bus and add bus watch
    // Start playing
    gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);

    gst_debug_bin_to_dot_file(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example");
    GstBus *bus = gst_element_get_bus(GST_ELEMENT(pipeline));
    // gst_bus_add_watch(bus, bus_call, &context);
    gboolean terminate = FALSE;
    GstMessage *msg;
    int file_index = 0;
    do
    {
        msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_APPLICATION);
        /* Parse message */
        if (msg != NULL)
        {
            GError *err;
            gchar *debug_info;

            switch (GST_MESSAGE_TYPE(msg))
            {
            case GST_MESSAGE_ERROR:
                gst_message_parse_error(msg, &err, &debug_info);
                g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
                g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
                g_clear_error(&err);
                g_free(debug_info);
                terminate = TRUE;
                break;
            case GST_MESSAGE_EOS:
                g_print("End-Of-Stream reached.\n");
                terminate = TRUE;
                break;
            case GST_MESSAGE_STATE_CHANGED:
                /* We are only interested in state-changed messages from the pipeline */
                if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline))
                {
                    GstState old_state, new_state, pending_state;
                    gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
                    g_print("\nPipeline state changed from %s to %s:\n",
                            gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
                    /* Print the current capabilities of the sink element */
                    // print_pad_capabilities(sink, "sink");
                }
                break;
            case GST_MESSAGE_APPLICATION:
                g_print("Got application message");
                file_index++;
                if (file_index > 1)
                {
                    file_index = 0;
                }

                QueueFileResult oldResult = result;

                GstPad *ac_sink_pad = gst_element_request_pad_simple(ac, "sink_%u");
                GstPad *vc_sink_pad = gst_element_request_pad_simple(vc, "sink_%u");
                result = queue_next("sdfa", VIDEO_FILES[file_index], pipeline, vc_sink_pad, ac_sink_pad);
                context.queue_result = &result;

                DecodePadAddedData decodePadAddedData;
                decodePadAddedData.id = "adsf";
                decodePadAddedData.audioqueue = result.audioqueue;
                decodePadAddedData.videoqueue = result.videoqueue;
                decodePadAddedData.pipeline = pipeline;
                g_signal_connect(result.decodebin, "pad-added",
                                 G_CALLBACK(on_decoder_pad_added), &decodePadAddedData);

                AboutToFinishData aboutToFinishData;
                aboutToFinishData.id = id;
                aboutToFinishData.pipeline = pipeline;
                aboutToFinishData.queue_result = result;
                g_signal_connect(result.decodebin, "about-to-finish",
                                 G_CALLBACK(on_about_to_finish), &aboutToFinishData);

                for (int i = 0; i < 6; i++)
                {
                    gst_element_sync_state_with_parent(result.elements[i]);
                }
                gst_element_release_request_pad(ac, oldResult.ac_sink_pad);
                gst_element_release_request_pad(vc, oldResult.vc_sink_pad);

                for (int i = 0; i < 6; i++)
                {
                    gst_element_set_state(oldResult.elements[i], GST_STATE_NULL);
                }
                for (int i = 0; i < 6; i++)
                {
                    gst_bin_remove(GST_BIN(pipeline), oldResult.elements[i]);
                }
                gst_object_unref(oldResult.ac_sink_pad);
                gst_object_unref(oldResult.vc_sink_pad);

                break;
            default:
                g_printerr("Unexpected message received.\n");
                break;
            }
            gst_message_unref(msg);
        }
    } while (!terminate);
    gst_object_unref(bus);


    // Cleanup
    gst_element_set_state(pipeline, GST_STATE_NULL);
    gst_object_unref(pipeline);

    return 0;
}

I tried to trace it with GST_DEBUG=:TRACE: and triggering SIGUSR1 signal to the process, I don’t see any extra elements in the output thought there’s alot of GstBuffers and GstMemory
Text is too large to include here, so here’s a github gist

you should call gst_deinit() after the pipeline was unrefed, before exiting from the main function, then you’ll get a list of the leaked objects in the logs.

For leak not related with gst refcounted objects you can use valgrind or ASan.

For instance in the on_mq_pad_added function, the pad_name string is leaked.