Appsink to appsrc to videosink

Hello

I want to receive data from appsink, push it to appsrc, and then output the video through videosink.

I received data from pull-sample and implemented the code through push-buffer.

After execution, the video is output, but a memory leak occurs and out of memory occurs.

Can you figure out what’s causing it?

MY CODE:

#include <gst/gst.h>
#include <string.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>

GMainLoop *loop;

GstBus *src_bus;
GstElement *src_pipeline;
GstCaps *src_caps;
GstElement *testsrc;

GstBus *sink_bus;
GstElement *sink_pipeline;
GstCaps *sink_caps;
GstElement *testsink;

GstElement *app_sink;
GstElement *app_source;

GstElement *video_sink;
GstElement *video_convert;
GstElement *video_source;
GstElement *video_caps;

typedef struct _CustomData {
} CustomData;

guint sourceid;

/* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE bytes into appsrc.
 * The idle handler is added to the mainloop when appsrc requests us to start sending data (need-data signal)
 * and is removed when appsrc has enough data (enough-data signal).
 */
static gboolean push_data (CustomData *data) {
  GstBuffer *src_buffer;
  GstFlowReturn ret;

  /* Create a new empty buffer */
  //src_buffer = gst_buffer_copy_deep (sink_buffer);

  /* Push the buffer into the appsrc */
  //g_signal_emit_by_name (app_source, "push-buffer", sink_buffer, &ret);


  if (ret != GST_FLOW_OK) {
    /* We got some error, stop sending data */
    return FALSE;
  }

  return GST_FLOW_OK;
}

/* This signal callback triggers when appsrc needs data. Here, we add an idle handler
 * to the mainloop to start pushing data into the appsrc */
static void start_feed (GstElement *source, guint size, CustomData *data) {
  if (sourceid == 0) {
    g_print ("Start feeding\n");
    sourceid = g_idle_add ((GSourceFunc) push_data, NULL);
  }
}

/* This callback triggers when appsrc has enough data and we can stop sending.
 * We remove the idle handler from the mainloop */
static void stop_feed (GstElement *source, CustomData *data) {
  if (sourceid != 0) {
    g_print ("Stop feeding\n");
    g_source_remove (sourceid);
   sourceid = 0;
  }
}

/* The appsink has received a buffer */
static GstFlowReturn new_sample(GstElement *sink, gpointer data)
{
    GstSample *sample; 
    GstBuffer *buffer;
    GstFlowReturn ret = GST_FLOW_OK;


    g_signal_emit_by_name (app_sink, "pull-sample", &sample, NULL);

    if (sample)
    {
    	g_print("*\n");
    	
        buffer = gst_sample_get_buffer (sample);
  	
  	g_signal_emit_by_name (app_source, "push-buffer", buffer, &ret);
  	  
	gst_sample_unref (sample);

    }
    else
    {
        g_print ("could not make snapshot\n");
    }

    return GST_FLOW_OK;
}

/* called when we get a GstMessage from the source pipeline when we get EOS, we
 * notify the appsrc of it. */
static void src_message_cb(GstBus *bus, GstMessage *msg, gpointer data)
{
    switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_ERROR: {
        GError *err;
        gchar *debug_info;

        gst_message_parse_error(msg, &err, &debug_info);
        g_print("Error: %s\n", err->message);
        g_error_free(err);
        g_free(debug_info);

        gst_element_set_state(src_pipeline, GST_STATE_READY);
        g_main_loop_quit(loop);
        break;
    }
    case GST_MESSAGE_EOS: {
        /* end-of-stream */
        gst_element_set_state(src_pipeline, GST_STATE_READY);
        g_main_loop_quit(loop);
        break;
    }
    default:
        /* Unhandled message */
        break;
    }
}


static void sink_message_cb(GstBus *bus, GstMessage *msg, gpointer data)
{
    switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_ERROR: {
        GError *err;
        gchar *debug_info;

        gst_message_parse_error(msg, &err, &debug_info);
        g_print("Error: %s\n", err->message);
        g_error_free(err);
        g_free(debug_info);

        gst_element_set_state(sink_pipeline, GST_STATE_READY);
        g_main_loop_quit(loop);
        break;
    }
    case GST_MESSAGE_EOS: {
        /* end-of-stream */
        gst_element_set_state(sink_pipeline, GST_STATE_READY);
        g_main_loop_quit(loop);
        break;
    }
    default:
        /* Unhandled message */
        break;
    }
}

int
main (int argc, char *argv[])
{
  /* Initialize GStreamer */
    gst_init(NULL, NULL);

    loop = g_main_loop_new(NULL, FALSE);
        
    /* Create the elements */	
    video_source 		= gst_element_factory_make("videotestsrc", "video_source");
    video_caps = gst_element_factory_make ("capsfilter","video_caps");  
    video_convert 	= gst_element_factory_make("videoconvert", "video_convert");
    app_sink 	= gst_element_factory_make("appsink", "app_sink");
    
    /* Create the empty pipeline */
    src_pipeline 	= gst_pipeline_new("src_test_pipeline");
    
    if(!src_pipeline || !video_source || !video_convert || !app_sink) {
        g_printerr ("Not all elements could be created.\n");
        return -1;
    }
    
    /* Modify the caps properties */
    src_caps = gst_caps_new_simple("video/x-raw",\
        "format", G_TYPE_STRING, "UYVY",\
        "width", G_TYPE_INT, 640, \
        "height", G_TYPE_INT, 480, \
        "framerate", GST_TYPE_FRACTION, 30, 1,
        NULL);
     
    g_object_set (G_OBJECT (video_caps), "caps", src_caps, NULL);
    gst_caps_unref(src_caps);
    
    g_object_set(app_sink, "emit-signals", TRUE, "sync", FALSE, NULL);
    g_signal_connect(app_sink, "new-sample", G_CALLBACK(new_sample), NULL);
    
    /* Build the pipeline */
    gst_bin_add_many(GST_BIN(src_pipeline), video_source, video_caps, video_convert, app_sink, NULL);
    
    /* Link the elements together */
    if(gst_element_link_many(video_source, video_caps, video_convert, NULL) != TRUE || \
        gst_element_link(video_convert, app_sink) != TRUE)
    {
        g_printerr("Elements could not be linked.\n");
        gst_object_unref(src_pipeline);
        return -1;
    }

    /* Add function to watch bus */
    src_bus = gst_element_get_bus(src_pipeline);
    gst_bus_add_signal_watch(src_bus);
    g_signal_connect(G_OBJECT(src_bus), "message", (GCallback)src_message_cb, NULL);
    gst_object_unref(src_bus);

////////////////////////////////////////////////////////////////////////////////////////////////////

    app_source 		= gst_element_factory_make("appsrc", "app_source");
    video_convert	= gst_element_factory_make("videoconvert", "video_convert");
    video_sink 		= gst_element_factory_make("xvimagesink", "video_sink");

    /* Create the empty pipeline */
    sink_pipeline 	= gst_pipeline_new("sink_test_pipeline");
    
    if(!sink_pipeline || !app_source || !video_convert ||!video_sink)
    {
        g_printerr ("Not all elements could be created.\n");
        return -1;
    }

    /* Modify the source properties */
    g_object_set (app_source, "stream-type", 0, "is-live", 1, "do-timestamp", 0, "format", GST_FORMAT_TIME,NULL);
        
    /* Modify the caps properties */
    sink_caps = gst_caps_new_simple("video/x-raw", \
        "format", G_TYPE_STRING, "UYVY", \
        "width", G_TYPE_INT, 640, \
        "height", G_TYPE_INT, 480, \
        "framerate", GST_TYPE_FRACTION, 30, 1, \
        NULL);
	
    g_object_set(app_source, "caps", sink_caps, NULL);
    g_object_set(video_sink, "sync", FALSE, "async", FALSE, NULL);

    /* Build the pipeline */
    gst_bin_add_many(GST_BIN(sink_pipeline), app_source, video_convert, video_sink, NULL);
    
    /* Link the elements together */
    if(gst_element_link_many(app_source, video_convert, video_sink, NULL) != TRUE) {
        g_printerr("Elements could not be linked.\n");
        gst_object_unref(sink_pipeline);
        gst_caps_unref(sink_caps);
        return -1;
    }

    //g_signal_connect (app_source, "need-data", G_CALLBACK (start_feed), NULL);
    //g_signal_connect (app_source, "enough-data", G_CALLBACK (stop_feed), NULL);
    gst_caps_unref(sink_caps);
    
    //testsink = gst_bin_get_by_name(GST_BIN(sink_pipeline), "appsrc0");  
    
    /* Add function to watch bus */
    sink_bus = gst_element_get_bus(sink_pipeline);
    gst_bus_add_signal_watch(sink_bus);
    g_signal_connect(G_OBJECT(sink_bus), "message", (GCallback)sink_message_cb, NULL);
    gst_object_unref (sink_bus);
            
    gst_element_set_state(src_pipeline, GST_STATE_PLAYING);
    gst_element_set_state(sink_pipeline, GST_STATE_PLAYING);
    
    g_main_loop_run(loop);
    
    gst_element_set_state(src_pipeline, GST_STATE_NULL);
    gst_element_set_state(sink_pipeline, GST_STATE_NULL);
    
    gst_object_unref(src_pipeline);
    gst_object_unref(sink_pipeline);
    g_main_loop_unref (loop);
        
    return 0;
}

Regards
Rorosi

You set sync=false on the producer pipeline’s appsink.

Since there’s nothing else throttling data flow/production in the producer pipeline, the producer pipeline just creates raw video buffers as quickly as possible, CPU bound basically (and memory bound in the end, as you discovered).

You shove those buffers into the appsrc where they will be put into an internal unlimited queue.

So my guess is that the consumer pipeline outputting to xvimagesink will process buffers slower than the producer pipeline, so buffers pile up inside appsrc until you run out of memory.

@tpm

Thanks, I think the memory leak has been resolved.