Can not get appsink and appsrc in dynamic pipeline to work for live sources in push mode

Hi everyone!

TL:DR; Appsink ‘new-sample’ callback is never triggered and element is stuck in paused state even though everything else is set to playing, I cannot figure out why, please help!

I have been trying to get explore how I should construct our GStreamer application that is essentially a dynamic ML inference pipeline that can activate and deactivate different ML techniques. To carve out a suitable design, I have been trying to get a simpler pipeline with similar characteristics to work, but I cannot get the pipeline running, more specifically, the dynamically created appsrc to trigger its callback on ‘new-sample’ signals.

First a little bit about the pipeline I am trying to create to evaluate my design:
The ML inference techniques are replaced with a simple color augmenter that is supposed to change the frame to colorscale (grayscale with a red, green, or blue instead).

Initially, the pipeline looks like this:

  • videotestsrc → videoconvert → capsfilter → (My augmenter bin: ghost pad → tee)
  • (not linked) autovideosink

When a colorscale src, e.g., red, is requested from the custom augmenter bin, the pipeline will look like this:

  • videotestsrc → videoconvert → capsfilter → (My augmenter bin: ghost pad → tee → queue (red) → videoscale (just for simulating a preprocessing step for ML inference) → appsink (red)
  • (My augmenter bin: appsrc (red) → ghost pad) → autovideosink

If another color src is requested from the bin, another branch inside the augmenter bin will be added from the initial tee element, and a corresponding ghost pad will be created.

My main concern is that I do not understand why data is not flowing through the appsink to the appsrc with my callback function. Are there some “gotchas” that I missed when using appsink and appsrc in the same pipeline?

In another, even simpler sample application I created I had similar issues, and only managed to get data flowing after forcing an initial manual push of a buffer to the appsrc. Should this really be necessary?

I have exported .dot graphs of the pipeline before and after requesting a pad from the custom augmented bin, if they would be helpful for assisting me, I will gladly try to provide them somehow!

Here is a screenshot of the graph after a pad has been requested and the pipeline has been set to playing, note that the appsink has received a buffer (I believe it is the preroll buffer, as I am able to pull it from the element):

Here is the source code for my sample application (without the header files, since they probably are not necessary to understand the context and see what I am doing wrong):

#include "mainbin.h"
#include <gst/app/app.h>
#include <gst/gst.h>
#include "frameaugmenter.h"

typedef struct _ColorBranch {
  GstPad* tee_src_pad;
  GstElement* inputqueue;
  GstElement* videoscale;
  GstElement* appsink;
  GstElement* appsrc;
  GstElement* outputqueue;
} ColorBranch;

struct _GstAugmenterBin {
  GstBin parent;
  FrameAugmenter* augmenter;

  GstPad* ghostsink;
  GstElement* tee;
  ColorBranch *red, *blue, *green;
};

struct _GstAugmenterBinClass {
  GstBinClass parent;
};

typedef struct _CustomData {
  GMainLoop* loop;
  GstElement* pipeline;

  GstAugmenterBin* augmenterbin;

  GstElement* videosource;
  GstElement* sourcecapsfilter;
  GstElement* sourcevideoconvert;
  GstElement* videosink;
} CustomData;

GST_DEBUG_CATEGORY_STATIC(gst_augmenter_bin_debug_category);
#define GST_CAT_DEFAULT gst_augmenter_bin_debug_category

G_DEFINE_TYPE_WITH_CODE(GstAugmenterBin, gst_augmenter_bin, GST_TYPE_BIN,
                        GST_DEBUG_CATEGORY_INIT(gst_augmenter_bin_debug_category, "inferencebin", 0,
                                                "debug category for inferencebin element"));

static GstFlowReturn on_new_sample(GstElement* sink, gpointer data) {
  g_print("NEW SAMPLE!\n");  // Not printing ever
  GstAugmenterBin* bin = GST_AUGMENTER_BIN(data);
  GstSample* sample;
  sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));
  if (!sample) {
    g_printerr("Could not pull sample");
    return GST_FLOW_OK;
  }
  GstBuffer* buffer = gst_sample_get_buffer(sample);
  GstMapInfo map;
  gst_buffer_map(buffer, &map, GST_MAP_READ);

  g_print("Successfully read buffer of size %lu\n", map.size);

  GstElement* target_appsrc = NULL;
  ColorScale color;

  if (sink == bin->red->appsink) {
    color = COLOR_SCALE_RED;
    target_appsrc = bin->red->appsrc;
  } else if (sink == bin->green->appsink) {
    color = COLOR_SCALE_GREEN;
    target_appsrc = bin->green->appsrc;
  } else if (sink == bin->blue->appsink) {
    color = COLOR_SCALE_BLUE;
    target_appsrc = bin->blue->appsrc;
  } else {
    GST_ERROR_OBJECT(sink, "Unknown appsink");
    return GST_FLOW_ERROR;
  }

  // TODO: Process frame

  gst_app_src_push_sample(GST_APP_SRC(target_appsrc), sample);

  gst_buffer_unmap(buffer, &map);
  gst_sample_unref(sample);

  return GST_FLOW_OK;
}

static GstPad* augmenter_bin_request_new_pad(GstElement* element, GstPadTemplate* templ,
                                             const gchar* name, const GstCaps* caps) {
  GstAugmenterBin* self = GST_AUGMENTER_BIN(element);
  if (!g_str_has_prefix(name, "src_")) {
    GST_ERROR_OBJECT(self, "Invalid pad name prefix: %s", name);
    return NULL;
  }

  ColorScale color;
  ColorBranch* color_branch_elements;
  const char* color_name;
  if (g_str_has_suffix(name, "red")) {
    color = COLOR_SCALE_RED;
    color_name = "red";
    color_branch_elements = self->red;
  } else if (g_str_has_suffix(name, "green")) {
    color = COLOR_SCALE_GREEN;
    color_name = "green";
    color_branch_elements = self->green;
  } else if (g_str_has_suffix(name, "blue")) {
    color = COLOR_SCALE_BLUE;
    color_name = "blue";
    color_branch_elements = self->blue;
  } else {
    GST_ERROR_OBJECT(self, "Invalid pad name %s", name);
    return NULL;
  }

  // Create elements
  color_branch_elements->inputqueue =
      gst_element_factory_make("queue", g_strdup_printf("queue_%s", color_name));
  color_branch_elements->videoscale =
      gst_element_factory_make("videoscale", g_strdup_printf("videoscale_%s", color_name));
  color_branch_elements->appsink =
      gst_element_factory_make("appsink", g_strdup_printf("appsink_%s", color_name));
  color_branch_elements->appsrc =
      gst_element_factory_make("appsrc", g_strdup_printf("appsrc_%s", color_name));

  // Add to custom bin
  gst_bin_add_many(GST_BIN(self), color_branch_elements->inputqueue,
                   color_branch_elements->videoscale, color_branch_elements->appsink,
                   color_branch_elements->appsrc, NULL);

  // Link elements
  if (!gst_element_link_many(color_branch_elements->inputqueue, color_branch_elements->videoscale,
                             color_branch_elements->appsink, NULL)) {
    g_printerr("Could not link all elements when requesting new pad.\n");
  }

  // Request tee src and link with inputqueue
  color_branch_elements->tee_src_pad = gst_element_request_pad_simple(self->tee, "src_%u");
  if (!color_branch_elements->tee_src_pad) {
    g_printerr("Unable to request pad from tee\n");
  }
  GstPad* queue_pad = gst_element_get_static_pad(color_branch_elements->inputqueue, "sink");
  if (!queue_pad) {
    g_printerr("Unable to get sinkpad from queue\n");
  }
  if (gst_pad_link(color_branch_elements->tee_src_pad, queue_pad) != GST_PAD_LINK_OK) {
    g_printerr("Failed to link tee to queue\n");
  } else {
    g_print("Successfully linked tee to queue\n");
  }

  // Configure elements
  g_signal_connect(color_branch_elements->appsink, "new-sample", G_CALLBACK(on_new_sample), self);
  g_object_set(G_OBJECT(color_branch_elements->appsink), "emit-signals", TRUE, "sync", FALSE,
               "drop", TRUE, "max-buffers", 1, NULL);
  g_object_set(G_OBJECT(color_branch_elements->appsrc), "format", GST_FORMAT_TIME, "is-live", TRUE,
               "stream-type", 0, "emit-signals", TRUE, "do-timestamp", TRUE, NULL);
  g_object_set(G_OBJECT(color_branch_elements->inputqueue), "max-size-buffers", 2, "leaky",
               2 /* Leak downstream */, NULL);
  g_object_set(G_OBJECT(color_branch_elements->videoscale), "method", 1, NULL);

  // Set up ghost pad
  GstPad* src_pad = gst_element_get_static_pad(color_branch_elements->appsrc, "src");
  GstPad* ghost_pad = gst_ghost_pad_new(name, src_pad);
  gst_element_add_pad(GST_ELEMENT(self), ghost_pad);
  gst_object_unref(src_pad);

  return ghost_pad;
}

static void gst_augmenter_bin_finalize(GObject* object) {
  GstAugmenterBin* self = GST_AUGMENTER_BIN(object);
  delete self->augmenter;
  self->augmenter = nullptr;

  g_free(self->red);
  g_free(self->green);
  g_free(self->blue);
}

static void gst_augmenter_bin_class_init(GstAugmenterBinClass* klass) {
  GstElementClass* element_class = GST_ELEMENT_CLASS(klass);
  GObjectClass* object_class = G_OBJECT_CLASS(klass);

  // Override the request_new_pad method
  element_class->request_new_pad = augmenter_bin_request_new_pad;

  gst_element_class_add_pad_template(
      element_class,
      gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
                           gst_caps_from_string("video/x-raw,format=BGR,framerate=10/1")));
  gst_element_class_add_pad_template(
      element_class,
      gst_pad_template_new("src_%s", GST_PAD_SRC, GST_PAD_REQUEST,
                           gst_caps_from_string("video/x-raw,format=BGR,framerate=10/1")));

  gst_element_class_set_static_metadata(
      GST_ELEMENT_CLASS(klass), "AugmenterBin", "Filter",
      "A bin with the augmenter element for testing experimental implementation",
      "Developer Name <developer.name@email.com>");

  object_class->finalize = gst_augmenter_bin_finalize;
  g_print("In bin class init... \n");
}

static void gst_augmenter_bin_init(GstAugmenterBin* self) {
  self->augmenter = new FrameAugmenter();
  self->red = (ColorBranch*)g_malloc0(sizeof(ColorBranch));
  self->green = (ColorBranch*)g_malloc0(sizeof(ColorBranch));
  self->blue = (ColorBranch*)g_malloc0(sizeof(ColorBranch));

  // Create tee and add to bin
  self->tee = gst_element_factory_make("tee", "augmenterbintee");
  gst_bin_add(GST_BIN(self), self->tee);

  // Link tee sink with bin's ghost sink
  GstPad* sink_pad = gst_element_get_static_pad(self->tee, "sink");
  self->ghostsink = gst_ghost_pad_new("sink", sink_pad);
  gst_element_add_pad(GST_ELEMENT(self), self->ghostsink);
  gst_object_unref(sink_pad);
  g_print("In bin init..\n");
}

static gboolean bus_call(GstBus* bus, GstMessage* msg, gpointer data) {
  GMainLoop* loop = (GMainLoop*)data;
  switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_STATE_CHANGED: {
      GstState old_state, new_state, pending_state;
      gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
      // Print out state changes for each element
      g_print("Element %s changed state from %s to %s (pending: %s)\n", GST_OBJECT_NAME(msg->src),
              gst_element_state_get_name(old_state), gst_element_state_get_name(new_state),
              gst_element_state_get_name(pending_state));
      break;
    }
    case GST_MESSAGE_ERROR: {
      GError* err;
      gchar* debug;
      gst_message_parse_error(msg, &err, &debug);
      g_print("Error: %s\n", err->message);
      g_error_free(err);
      g_free(debug);
      g_main_loop_quit(loop);
      break;
    }
    case GST_MESSAGE_EOS:
      g_print("End of stream\n");
      g_main_loop_quit(loop);
      break;
    default:
      break;
  }
  return TRUE;
}

int main(int argc, char* argv[]) {
  CustomData data;
  GstCaps* sourcecaps;

  gst_init(&argc, &argv);
  data.loop = g_main_loop_new(NULL, FALSE);

  g_print("Creating elements.\n");
  data.pipeline = gst_pipeline_new("TestColorAugmenterPipeline");
  data.videosource = gst_element_factory_make("videotestsrc", "videosource");
  data.sourcevideoconvert = gst_element_factory_make("videoconvert", "videoconvert");
  data.sourcecapsfilter = gst_element_factory_make("capsfilter", "sourcecapsfilter");
  data.augmenterbin =
      GST_AUGMENTER_BIN(g_object_new(GST_TYPE_AUGMENTER_BIN, "name", "AugmenterBin", NULL));
  data.videosink = gst_element_factory_make("autovideosink", "videosink");
  if (!data.videosource || !data.sourcevideoconvert || !data.sourcecapsfilter ||
      !data.augmenterbin || !data.videosink) {
    g_printerr("Could not create all elements\n");
    return -1;
  }

  g_object_set(G_OBJECT(data.videosource), "is-live", TRUE, NULL);
  sourcecaps = gst_caps_from_string("video/x-raw,format=BGR,width=640,height=400,framerate=10/1");
  // g_object_set(G_OBJECT(data.videosource), "is-live", TRUE, NULL);
  g_object_set(G_OBJECT(data.sourcecapsfilter), "caps", sourcecaps, NULL);
  gst_caps_unref(sourcecaps);

  // gst_bin_add_many(GST_BIN(data.augmenterbin), data.bintee, NULL);
  gst_bin_add_many(GST_BIN(data.pipeline), data.videosource, data.sourcevideoconvert,
                   data.sourcecapsfilter, GST_ELEMENT(data.augmenterbin), data.videosink, NULL);

  if (!gst_element_link_many(data.videosource, data.sourcevideoconvert, data.sourcecapsfilter,
                             GST_ELEMENT(data.augmenterbin), NULL)) {
    g_printerr("Could not link all elements\n");
    return -1;
  }

  GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(data.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
                            "augmenter-pipeline-prereq");
  // Request a red src pad from the bin, and link it with the videosink
  g_print("Requesting red src pad from bin and linking with videosink\n");
  GstPad* bin_src_pad = gst_element_request_pad_simple(GST_ELEMENT(data.augmenterbin), "src_red");
  GstPad* video_sink_pad = gst_element_get_static_pad(data.videosink, "sink");
  gst_pad_link(bin_src_pad, video_sink_pad);
  gst_object_unref(bin_src_pad);
  gst_object_unref(video_sink_pad);

  g_print("Setting pipeline to playing.\n");
  gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
  g_usleep(100000);

  // Confirmed that appsink is emitting signals
  gboolean b = gst_app_sink_get_emit_signals(GST_APP_SINK(data.augmenterbin->red->appsink));
  if (b) {
    g_print("Appsink Emitting signals\n");
  } else {
    g_print("Appsink NOT Emitting signals\n");
  }
  // Confirmed that appsrc is emitting signals
  b = gst_app_src_get_emit_signals(GST_APP_SRC(data.augmenterbin->red->appsrc));
  if (b) {
    g_print("Appsrc Emitting signals\n");
  } else {
    g_print("Appsrc NOT Emitting signals\n");
  }

  // Attempt to clear the buffer by pulling preroll
  GstSample* sample = gst_app_sink_pull_preroll(GST_APP_SINK(data.augmenterbin->red->appsink));
  if (sample) {
    g_print("Pulled preroll sample to clear the queue.\n");
    gst_sample_unref(sample);
  }

  GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(data.pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "augmenter-pipeline");

  // Bus message callback
  GstBus* bus = gst_element_get_bus(data.pipeline);
  gst_bus_add_signal_watch(bus);
  g_signal_connect(bus, "message", G_CALLBACK(bus_call), NULL);
  gst_object_unref(bus);

  g_main_loop_run(data.loop);
  g_print("g_main_loop returned, stopping playback.\n");

  g_print("Stopping pipeline.\n");
  gst_element_set_state(data.pipeline, GST_STATE_NULL);

  g_print("Deleting pipeline.\n");
  gst_object_unref(GST_OBJECT(data.augmenterbin));
  gst_object_unref(GST_OBJECT(data.pipeline));
  g_main_loop_unref(data.loop);

  return 0;
}

For anyone that may have a similar problem, here is how I solved it:

After adding a callback on the new-preroll signal on the AppSink that just forwards the sample to the AppSrc, it is now working without manually pushing a buffer to the AppSrc!

So it seems that although the AppSrc was in playing state without receiving any data, it needed a preroll anyway before data could flow through the pipeline. I am unsure if this is the correct solution, but it feels like a decent one at least.

Side note: I also added caps to the AppSrc.

Here is the new-preroll callback that I used:

// Connected signal like this:
g_signal_connect(color_branch_elements->appsink, "new-preroll", G_CALLBACK(on_new_preroll), self);

// AppSink pre-roll callback
static GstFlowReturn on_new_preroll(GstElement* sink, gpointer data) {
  GstAugmenterBin* bin = GST_AUGMENTER_BIN(data);
  GstSample* sample;
  sample = gst_app_sink_pull_preroll(GST_APP_SINK(sink));

  if (!sample) {
    g_printerr("Could not pull sample");
    return GST_FLOW_OK;
  }

  GstElement* target_appsrc = NULL;
  if (sink == bin->red->appsink) {
    target_appsrc = bin->red->appsrc;
  } else if (sink == bin->green->appsink) {
    target_appsrc = bin->green->appsrc;
  } else if (sink == bin->blue->appsink) {
    target_appsrc = bin->blue->appsrc;
  } else {
    GST_ERROR_OBJECT(sink, "Unknown appsink");
    return GST_FLOW_ERROR;
  }

  gst_app_src_push_sample(GST_APP_SRC(target_appsrc), sample);
  gst_sample_unref(sample);

  return GST_FLOW_OK;
}

1 Like