Hello,
I’m trying to make a simple pipeline here a playlist of video files would run one after the other. I use a uridecodebin3 element and pass it through the pipeline stages until it reaches an rtmp sink. After the file finishes playing, I remove the uridecodebin3, create a new one and attach it to the pipeline.
After 7 hours of playtime, I noticed that the memory usages of the process increased from averaging around 500mb to 2.5GB. Not sure if i’m doing something wrong or if there’s a leak somewhere
Here’s the source code
#include <gst/gst.h>
#include <stdio.h>
#include <string.h>
#define MAX_FILES 2
typedef struct
{
GstElement *elements[6];
GstElement *decodebin;
GstElement *audioqueue;
GstElement *videoqueue;
GstPad *ac_sink_pad;
GstPad *vc_sink_pad;
} QueueFileResult;
typedef struct
{
GstElement *pipeline;
GstElement *source;
QueueFileResult *queue_result;
const char *files[MAX_FILES];
int current_file_index;
} StreamContext;
typedef struct
{
GstElement *mux
} OnMQPadAddedData;
typedef struct
{
GstElement *videoqueue;
GstElement *audioqueue;
GstPipeline *pipeline;
char *id;
} DecodePadAddedData;
typedef struct
{
char *id;
GstPipeline *pipeline;
QueueFileResult queue_result;
} AboutToFinishData;
void rand_str(char *dest, size_t length)
{
char charset[] =
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
while (length-- > 0)
{
size_t index = (double)rand() / RAND_MAX * (sizeof charset - 1);
*dest++ = charset[index];
}
*dest = '\0';
}
static void on_decoder_pad_added(GstElement *element, GstPad *pad, gpointer data);
static void on_about_to_finish(GstElement *element, gpointer data);
static QueueFileResult queue_next(char *id, char *url, GstPipeline *pipeline, GstPad *vc_sink_pad, GstPad *ac_sink_pad)
{
QueueFileResult result;
result.ac_sink_pad = ac_sink_pad;
result.vc_sink_pad = vc_sink_pad;
GstElement *uridecodebin = gst_element_factory_make("uridecodebin3", NULL);
g_object_set(G_OBJECT(uridecodebin), "uri", url, NULL);
gst_bin_add(GST_BIN(pipeline), uridecodebin);
result.decodebin = uridecodebin;
GstElement *videoqueue = gst_element_factory_make("queue2", NULL);
GstElement *videoconvert = gst_element_factory_make("videoconvert", NULL);
gst_bin_add_many(GST_BIN(pipeline), videoqueue, videoconvert, NULL);
gst_element_link(videoqueue, videoconvert);
GstElement *audioqueue = gst_element_factory_make("queue2", NULL);
GstElement *audioconvert = gst_element_factory_make("audioconvert", NULL);
GstElement *audioresample = gst_element_factory_make("audioresample", NULL);
gst_bin_add_many(GST_BIN(pipeline), audioqueue, audioconvert, audioresample, NULL);
gst_element_link_many(audioqueue, audioconvert, audioresample, NULL);
GstPad *audioresample_src_pad = gst_element_get_static_pad(audioresample, "src");
GstPad *videoconvert_src_pad = gst_element_get_static_pad(videoconvert, "src");
gst_pad_link(videoconvert_src_pad, vc_sink_pad);
gst_pad_link(audioresample_src_pad, ac_sink_pad);
gst_object_unref(videoconvert_src_pad);
gst_object_unref(audioresample_src_pad);
result.audioqueue = audioqueue;
result.videoqueue = videoqueue;
GstElement *elements[6] = {
videoqueue,
videoconvert,
audioqueue,
audioconvert,
audioresample,
uridecodebin};
// Copy each element individually
for (int i = 0; i < 6; i++)
{
result.elements[i] = elements[i];
}
return result;
}
static void on_about_to_finish(GstElement *element, gpointer data)
{
AboutToFinishData *context = (AboutToFinishData *)data;
GstBus *bus = gst_pipeline_get_bus(context->pipeline);
GstMessage *msg = gst_message_new_application(GST_OBJECT(element), gst_structure_new_from_string(context->id));
gst_bus_post(bus, msg);
// TODO: Do we need to unref the message and structure?
gst_object_unref(bus);
}
static void on_decoder_pad_added(GstElement *element, GstPad *src_pad, gpointer data)
{
DecodePadAddedData *context = (DecodePadAddedData *)data;
GstCaps *caps;
GstStructure *str;
// TODO: Do we get current caps or query?
caps = gst_pad_query_caps(src_pad, NULL);
if (!caps)
{
g_printerr("Failed to get caps\n");
return;
}
str = gst_caps_get_structure(caps, 0);
g_print("\n========================== Got caps: %s\n", gst_structure_get_name(str));
if (gst_structure_has_name(str, "video/x-raw"))
{
g_printerr("================= Before getting static pad\n");
GstPad *videoqueue_sink_pad = gst_element_get_static_pad(context->videoqueue, "sink");
g_printerr("================= Got static pad\n");
if (gst_pad_link(src_pad, videoqueue_sink_pad) != GST_PAD_LINK_OK)
{
g_printerr("Failed to link video pad");
}
else
{
// gst_object_unref(context->videoqueue_sink_pad);
}
gst_object_unref(videoqueue_sink_pad);
gst_debug_bin_to_dot_file(GST_BIN(context->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example_video");
}
else if (gst_structure_has_name(str, "audio/x-raw"))
{
g_printerr("================= Before getting static pad\n");
GstPad *audioqueue_sink_pad = gst_element_get_static_pad(context->audioqueue, "sink");
g_printerr("================= Got static pad\n");
if (gst_pad_link(src_pad, audioqueue_sink_pad) != GST_PAD_LINK_OK)
{
g_printerr("Failed to link audio pad");
}
else
{
// gst_object_unref(context->audioqueue_sink_pad);
}
gst_object_unref(audioqueue_sink_pad);
gst_debug_bin_to_dot_file(GST_BIN(context->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example_audio");
}
gst_caps_unref(caps);
}
static void on_mq_pad_added(GstElement *element, GstPad *pad, gpointer data)
{
OnMQPadAddedData *context = (OnMQPadAddedData *)data;
gchar *pad_name = gst_object_get_name(GST_OBJECT(pad));
// Video sink pad is requested first
// So we get the video at src_0
if (g_strcmp0(pad_name, "src_0") == 0)
{
g_print("==== MQ: Pad added %s\n", pad_name);
GstPad *mux_video_sink_pad = gst_element_request_pad_simple(context->mux, "video");
g_print("Pad name is %s\n", gst_pad_get_name(mux_video_sink_pad));
if (gst_pad_link(pad, mux_video_sink_pad) != GST_PAD_LINK_OK)
{
g_printerr("MQ: Error linking src_0");
}
gst_object_unref(mux_video_sink_pad);
}
else if (g_strcmp0(pad_name, "src_1") == 0)
{
g_print("==== MQ: Pad added %s\n", pad_name);
// Audio pad
GstPad *mux_audio_sink_pad = gst_element_request_pad_simple(context->mux, "audio");
g_print("Pad name is %s\n", gst_pad_get_name(mux_audio_sink_pad));
if (gst_pad_link(pad, mux_audio_sink_pad) != GST_PAD_LINK_OK)
{
g_printerr("MQ: Error linking src_1");
}
gst_object_unref(mux_audio_sink_pad);
}
}
int main(int argc, char *argv[])
{
char *VIDEO_FILES[MAX_FILES] = {
"file:///rs-app/vid2.webm",
"file:///rs-app/vert.mp4",
};
// Initialize GStreamer
printf("==================================== Beefore INIT");
gst_init(&argc, &argv);
printf("Initilized");
// Hardcode RTMP destination
const char *rtmp_url = "rtmp://localhost:1936/live/live";
GstPipeline *pipeline = (GstPipeline *)gst_pipeline_new("custom-pipeline");
GstElement *audioqueue = gst_element_factory_make("queue2", NULL);
GstElement *audiomixer = gst_element_factory_make_full("audiomixer", "force-live", TRUE, "ignore-inactive-pads", TRUE, "latency", 100000000, "discont-wait", 0, NULL);
GstElement *ac = gst_element_factory_make("concat", NULL);
GstElement *vc = gst_element_factory_make("concat", NULL);
GstElement *mux = gst_element_factory_make("flvmux", NULL);
g_object_set(GST_OBJECT(mux), "streamable", TRUE, "enforce-increasing-timestamps", TRUE, NULL);
GstElement *mq = gst_element_factory_make("multiqueue", NULL);
g_object_set(GST_OBJECT(mq), "sync-by-running-time", TRUE, "use-interleave", TRUE, "max-size-time", 666666668, "min-interleave-time", 666666668, "max-size-buffers", 0, NULL);
GstElement *compositor = gst_element_factory_make_full("compositor",
"force-live",
TRUE,
"ignore-inactive-pads",
TRUE,
"latency",
100000000, NULL);
GstElement *compositor_queue = gst_element_factory_make("queue2", NULL);
GstElement *compositor_videoconvert = gst_element_factory_make("videoconvert", NULL);
GstElement *compositor_videoscale = gst_element_factory_make("videoscale", NULL);
GstElement *compositor_videoscale_filter = gst_element_factory_make("capsfilter", NULL);
GstCaps *compositor_videoscale_caps = gst_caps_from_string("video/x-raw, width=1920,height=1080");
g_object_set(GST_OBJECT(compositor_videoscale_filter), "caps", compositor_videoscale_caps, NULL);
gst_caps_unref(compositor_videoscale_caps);
GstElement *x264enc = gst_element_factory_make("x264enc", NULL);
g_object_set(GST_OBJECT(x264enc), "bitrate", 10000, "tune", 4, "speed-preset", 1, "key-int-max", 30, NULL); // TODO: Check tune and speed-preset enums
GstElement *avenc_aac = gst_element_factory_make("avenc_aac", NULL);
GstElement *aacparse = gst_element_factory_make("aacparse", NULL);
GstElement *sink_queue = gst_element_factory_make("queue2", NULL);
GstElement *sink = gst_element_factory_make("rtmp2sink", NULL);
g_object_set(GST_OBJECT(sink), "location", rtmp_url, NULL);
gst_bin_add_many(GST_BIN(pipeline), audioqueue, audiomixer, ac, vc, mux, mq, compositor, compositor_queue, compositor_videoconvert, compositor_videoscale, compositor_videoscale_filter, x264enc, avenc_aac, aacparse, sink_queue, sink, NULL);
OnMQPadAddedData onMQPadAddedData;
onMQPadAddedData.mux = mux;
g_signal_connect(mq, "pad-added", G_CALLBACK(on_mq_pad_added), &onMQPadAddedData);
// Video tail
gst_element_link(vc, compositor_queue);
gst_element_link_many(compositor, compositor_videoconvert, compositor_videoscale, compositor_videoscale_filter, x264enc, NULL);
GstPad *compositor_queue_src_pad = gst_element_get_static_pad(compositor_queue, "src");
GstPad *compositor_sink_pad = gst_element_request_pad_simple(compositor, "sink_%u");
g_object_set(GST_OBJECT(compositor_sink_pad), "width", 1920, "height", 1080, NULL);
gst_pad_link(compositor_queue_src_pad, compositor_sink_pad);
gst_object_unref(compositor_queue_src_pad);
gst_object_unref(compositor_sink_pad);
GstPad *mq_video_sink_pad = gst_element_request_pad_simple(mq, "sink_%u");
GstPad *x264enc_src_pad = gst_element_get_static_pad(x264enc, "src");
gst_pad_link(x264enc_src_pad, mq_video_sink_pad);
gst_object_unref(mq_video_sink_pad);
gst_object_unref(x264enc_src_pad);
// Audio tail
GstPad *ac_src_pad = gst_element_get_static_pad(ac, "src");
GstPad *audioqueue_sink_pad = gst_element_get_static_pad(audioqueue, "sink");
gst_pad_link(ac_src_pad, audioqueue_sink_pad);
gst_object_unref(ac_src_pad);
gst_object_unref(audioqueue_sink_pad);
GstPad *audioqueue_src_pad = gst_element_get_static_pad(audioqueue, "src");
GstPad *audiomixer_sink_pad = gst_element_request_pad_simple(audiomixer, "sink_%u");
gst_pad_link(audioqueue_src_pad, audiomixer_sink_pad);
gst_object_unref(audioqueue_src_pad);
gst_object_unref(audiomixer_sink_pad);
gst_element_link_many(audiomixer, avenc_aac, aacparse, NULL);
GstPad *mq_audio_sink_pad = gst_element_request_pad_simple(mq, "sink_%u");
GstPad *aacparse_src_pad = gst_element_get_static_pad(aacparse, "src");
gst_pad_link(aacparse_src_pad, mq_audio_sink_pad);
gst_object_unref(mq_audio_sink_pad);
gst_object_unref(aacparse_src_pad);
gst_element_link_many(mux, sink_queue, sink, NULL);
GstPad *ac_sink_pad = gst_element_request_pad_simple(ac, "sink_%u");
GstPad *vc_sink_pad = gst_element_request_pad_simple(vc, "sink_%u");
printf("Constructed pipeline");
char id[] = {[8] = '\1'};
rand_str(id, sizeof id);
QueueFileResult result = queue_next(id, VIDEO_FILES[0], pipeline, vc_sink_pad, ac_sink_pad);
DecodePadAddedData decodePadAddedData;
decodePadAddedData.id = "adsf";
decodePadAddedData.audioqueue = result.audioqueue;
decodePadAddedData.videoqueue = result.videoqueue;
decodePadAddedData.pipeline = pipeline;
g_signal_connect(result.decodebin, "pad-added",
G_CALLBACK(on_decoder_pad_added), &decodePadAddedData);
AboutToFinishData aboutToFinishData;
aboutToFinishData.id = id;
aboutToFinishData.pipeline = pipeline;
aboutToFinishData.queue_result = result;
g_signal_connect(result.decodebin, "about-to-finish",
G_CALLBACK(on_about_to_finish), &aboutToFinishData);
for (int i = 0; i < 6; i++)
{
gst_element_sync_state_with_parent(result.elements[i]);
}
StreamContext context;
context.queue_result = &result;
context.pipeline = pipeline;
// Get the bus and add bus watch
// Start playing
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
gst_debug_bin_to_dot_file(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "example");
GstBus *bus = gst_element_get_bus(GST_ELEMENT(pipeline));
// gst_bus_add_watch(bus, bus_call, &context);
gboolean terminate = FALSE;
GstMessage *msg;
int file_index = 0;
do
{
msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_APPLICATION);
/* Parse message */
if (msg != NULL)
{
GError *err;
gchar *debug_info;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
terminate = TRUE;
break;
case GST_MESSAGE_EOS:
g_print("End-Of-Stream reached.\n");
terminate = TRUE;
break;
case GST_MESSAGE_STATE_CHANGED:
/* We are only interested in state-changed messages from the pipeline */
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline))
{
GstState old_state, new_state, pending_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
g_print("\nPipeline state changed from %s to %s:\n",
gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
/* Print the current capabilities of the sink element */
// print_pad_capabilities(sink, "sink");
}
break;
case GST_MESSAGE_APPLICATION:
g_print("Got application message");
file_index++;
if (file_index > 1)
{
file_index = 0;
}
QueueFileResult oldResult = result;
GstPad *ac_sink_pad = gst_element_request_pad_simple(ac, "sink_%u");
GstPad *vc_sink_pad = gst_element_request_pad_simple(vc, "sink_%u");
result = queue_next("sdfa", VIDEO_FILES[file_index], pipeline, vc_sink_pad, ac_sink_pad);
context.queue_result = &result;
DecodePadAddedData decodePadAddedData;
decodePadAddedData.id = "adsf";
decodePadAddedData.audioqueue = result.audioqueue;
decodePadAddedData.videoqueue = result.videoqueue;
decodePadAddedData.pipeline = pipeline;
g_signal_connect(result.decodebin, "pad-added",
G_CALLBACK(on_decoder_pad_added), &decodePadAddedData);
AboutToFinishData aboutToFinishData;
aboutToFinishData.id = id;
aboutToFinishData.pipeline = pipeline;
aboutToFinishData.queue_result = result;
g_signal_connect(result.decodebin, "about-to-finish",
G_CALLBACK(on_about_to_finish), &aboutToFinishData);
for (int i = 0; i < 6; i++)
{
gst_element_sync_state_with_parent(result.elements[i]);
}
gst_element_release_request_pad(ac, oldResult.ac_sink_pad);
gst_element_release_request_pad(vc, oldResult.vc_sink_pad);
for (int i = 0; i < 6; i++)
{
gst_element_set_state(oldResult.elements[i], GST_STATE_NULL);
}
for (int i = 0; i < 6; i++)
{
gst_bin_remove(GST_BIN(pipeline), oldResult.elements[i]);
}
gst_object_unref(oldResult.ac_sink_pad);
gst_object_unref(oldResult.vc_sink_pad);
break;
default:
g_printerr("Unexpected message received.\n");
break;
}
gst_message_unref(msg);
}
} while (!terminate);
gst_object_unref(bus);
// Cleanup
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
return 0;
}