Hi everyone!
TL:DR; Appsink ‘new-sample’ callback is never triggered and element is stuck in paused state even though everything else is set to playing, I cannot figure out why, please help!
I have been trying to get explore how I should construct our GStreamer application that is essentially a dynamic ML inference pipeline that can activate and deactivate different ML techniques. To carve out a suitable design, I have been trying to get a simpler pipeline with similar characteristics to work, but I cannot get the pipeline running, more specifically, the dynamically created appsrc to trigger its callback on ‘new-sample’ signals.
First a little bit about the pipeline I am trying to create to evaluate my design:
The ML inference techniques are replaced with a simple color augmenter that is supposed to change the frame to colorscale (grayscale with a red, green, or blue instead).
Initially, the pipeline looks like this:
- videotestsrc → videoconvert → capsfilter → (My augmenter bin: ghost pad → tee)
- (not linked) autovideosink
When a colorscale src, e.g., red, is requested from the custom augmenter bin, the pipeline will look like this:
- videotestsrc → videoconvert → capsfilter → (My augmenter bin: ghost pad → tee → queue (red) → videoscale (just for simulating a preprocessing step for ML inference) → appsink (red)
- (My augmenter bin: appsrc (red) → ghost pad) → autovideosink
If another color src is requested from the bin, another branch inside the augmenter bin will be added from the initial tee element, and a corresponding ghost pad will be created.
My main concern is that I do not understand why data is not flowing through the appsink to the appsrc with my callback function. Are there some “gotchas” that I missed when using appsink and appsrc in the same pipeline?
In another, even simpler sample application I created I had similar issues, and only managed to get data flowing after forcing an initial manual push of a buffer to the appsrc. Should this really be necessary?
I have exported .dot graphs of the pipeline before and after requesting a pad from the custom augmented bin, if they would be helpful for assisting me, I will gladly try to provide them somehow!
Here is a screenshot of the graph after a pad has been requested and the pipeline has been set to playing, note that the appsink has received a buffer (I believe it is the preroll buffer, as I am able to pull it from the element):
Here is the source code for my sample application (without the header files, since they probably are not necessary to understand the context and see what I am doing wrong):
#include "mainbin.h"
#include <gst/app/app.h>
#include <gst/gst.h>
#include "frameaugmenter.h"
typedef struct _ColorBranch {
GstPad* tee_src_pad;
GstElement* inputqueue;
GstElement* videoscale;
GstElement* appsink;
GstElement* appsrc;
GstElement* outputqueue;
} ColorBranch;
struct _GstAugmenterBin {
GstBin parent;
FrameAugmenter* augmenter;
GstPad* ghostsink;
GstElement* tee;
ColorBranch *red, *blue, *green;
};
struct _GstAugmenterBinClass {
GstBinClass parent;
};
typedef struct _CustomData {
GMainLoop* loop;
GstElement* pipeline;
GstAugmenterBin* augmenterbin;
GstElement* videosource;
GstElement* sourcecapsfilter;
GstElement* sourcevideoconvert;
GstElement* videosink;
} CustomData;
GST_DEBUG_CATEGORY_STATIC(gst_augmenter_bin_debug_category);
#define GST_CAT_DEFAULT gst_augmenter_bin_debug_category
G_DEFINE_TYPE_WITH_CODE(GstAugmenterBin, gst_augmenter_bin, GST_TYPE_BIN,
GST_DEBUG_CATEGORY_INIT(gst_augmenter_bin_debug_category, "inferencebin", 0,
"debug category for inferencebin element"));
static GstFlowReturn on_new_sample(GstElement* sink, gpointer data) {
g_print("NEW SAMPLE!\n"); // Not printing ever
GstAugmenterBin* bin = GST_AUGMENTER_BIN(data);
GstSample* sample;
sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));
if (!sample) {
g_printerr("Could not pull sample");
return GST_FLOW_OK;
}
GstBuffer* buffer = gst_sample_get_buffer(sample);
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_READ);
g_print("Successfully read buffer of size %lu\n", map.size);
GstElement* target_appsrc = NULL;
ColorScale color;
if (sink == bin->red->appsink) {
color = COLOR_SCALE_RED;
target_appsrc = bin->red->appsrc;
} else if (sink == bin->green->appsink) {
color = COLOR_SCALE_GREEN;
target_appsrc = bin->green->appsrc;
} else if (sink == bin->blue->appsink) {
color = COLOR_SCALE_BLUE;
target_appsrc = bin->blue->appsrc;
} else {
GST_ERROR_OBJECT(sink, "Unknown appsink");
return GST_FLOW_ERROR;
}
// TODO: Process frame
gst_app_src_push_sample(GST_APP_SRC(target_appsrc), sample);
gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
static GstPad* augmenter_bin_request_new_pad(GstElement* element, GstPadTemplate* templ,
const gchar* name, const GstCaps* caps) {
GstAugmenterBin* self = GST_AUGMENTER_BIN(element);
if (!g_str_has_prefix(name, "src_")) {
GST_ERROR_OBJECT(self, "Invalid pad name prefix: %s", name);
return NULL;
}
ColorScale color;
ColorBranch* color_branch_elements;
const char* color_name;
if (g_str_has_suffix(name, "red")) {
color = COLOR_SCALE_RED;
color_name = "red";
color_branch_elements = self->red;
} else if (g_str_has_suffix(name, "green")) {
color = COLOR_SCALE_GREEN;
color_name = "green";
color_branch_elements = self->green;
} else if (g_str_has_suffix(name, "blue")) {
color = COLOR_SCALE_BLUE;
color_name = "blue";
color_branch_elements = self->blue;
} else {
GST_ERROR_OBJECT(self, "Invalid pad name %s", name);
return NULL;
}
// Create elements
color_branch_elements->inputqueue =
gst_element_factory_make("queue", g_strdup_printf("queue_%s", color_name));
color_branch_elements->videoscale =
gst_element_factory_make("videoscale", g_strdup_printf("videoscale_%s", color_name));
color_branch_elements->appsink =
gst_element_factory_make("appsink", g_strdup_printf("appsink_%s", color_name));
color_branch_elements->appsrc =
gst_element_factory_make("appsrc", g_strdup_printf("appsrc_%s", color_name));
// Add to custom bin
gst_bin_add_many(GST_BIN(self), color_branch_elements->inputqueue,
color_branch_elements->videoscale, color_branch_elements->appsink,
color_branch_elements->appsrc, NULL);
// Link elements
if (!gst_element_link_many(color_branch_elements->inputqueue, color_branch_elements->videoscale,
color_branch_elements->appsink, NULL)) {
g_printerr("Could not link all elements when requesting new pad.\n");
}
// Request tee src and link with inputqueue
color_branch_elements->tee_src_pad = gst_element_request_pad_simple(self->tee, "src_%u");
if (!color_branch_elements->tee_src_pad) {
g_printerr("Unable to request pad from tee\n");
}
GstPad* queue_pad = gst_element_get_static_pad(color_branch_elements->inputqueue, "sink");
if (!queue_pad) {
g_printerr("Unable to get sinkpad from queue\n");
}
if (gst_pad_link(color_branch_elements->tee_src_pad, queue_pad) != GST_PAD_LINK_OK) {
g_printerr("Failed to link tee to queue\n");
} else {
g_print("Successfully linked tee to queue\n");
}
// Configure elements
g_signal_connect(color_branch_elements->appsink, "new-sample", G_CALLBACK(on_new_sample), self);
g_object_set(G_OBJECT(color_branch_elements->appsink), "emit-signals", TRUE, "sync", FALSE,
"drop", TRUE, "max-buffers", 1, NULL);
g_object_set(G_OBJECT(color_branch_elements->appsrc), "format", GST_FORMAT_TIME, "is-live", TRUE,
"stream-type", 0, "emit-signals", TRUE, "do-timestamp", TRUE, NULL);
g_object_set(G_OBJECT(color_branch_elements->inputqueue), "max-size-buffers", 2, "leaky",
2 /* Leak downstream */, NULL);
g_object_set(G_OBJECT(color_branch_elements->videoscale), "method", 1, NULL);
// Set up ghost pad
GstPad* src_pad = gst_element_get_static_pad(color_branch_elements->appsrc, "src");
GstPad* ghost_pad = gst_ghost_pad_new(name, src_pad);
gst_element_add_pad(GST_ELEMENT(self), ghost_pad);
gst_object_unref(src_pad);
return ghost_pad;
}
static void gst_augmenter_bin_finalize(GObject* object) {
GstAugmenterBin* self = GST_AUGMENTER_BIN(object);
delete self->augmenter;
self->augmenter = nullptr;
g_free(self->red);
g_free(self->green);
g_free(self->blue);
}
static void gst_augmenter_bin_class_init(GstAugmenterBinClass* klass) {
GstElementClass* element_class = GST_ELEMENT_CLASS(klass);
GObjectClass* object_class = G_OBJECT_CLASS(klass);
// Override the request_new_pad method
element_class->request_new_pad = augmenter_bin_request_new_pad;
gst_element_class_add_pad_template(
element_class,
gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
gst_caps_from_string("video/x-raw,format=BGR,framerate=10/1")));
gst_element_class_add_pad_template(
element_class,
gst_pad_template_new("src_%s", GST_PAD_SRC, GST_PAD_REQUEST,
gst_caps_from_string("video/x-raw,format=BGR,framerate=10/1")));
gst_element_class_set_static_metadata(
GST_ELEMENT_CLASS(klass), "AugmenterBin", "Filter",
"A bin with the augmenter element for testing experimental implementation",
"Developer Name <developer.name@email.com>");
object_class->finalize = gst_augmenter_bin_finalize;
g_print("In bin class init... \n");
}
static void gst_augmenter_bin_init(GstAugmenterBin* self) {
self->augmenter = new FrameAugmenter();
self->red = (ColorBranch*)g_malloc0(sizeof(ColorBranch));
self->green = (ColorBranch*)g_malloc0(sizeof(ColorBranch));
self->blue = (ColorBranch*)g_malloc0(sizeof(ColorBranch));
// Create tee and add to bin
self->tee = gst_element_factory_make("tee", "augmenterbintee");
gst_bin_add(GST_BIN(self), self->tee);
// Link tee sink with bin's ghost sink
GstPad* sink_pad = gst_element_get_static_pad(self->tee, "sink");
self->ghostsink = gst_ghost_pad_new("sink", sink_pad);
gst_element_add_pad(GST_ELEMENT(self), self->ghostsink);
gst_object_unref(sink_pad);
g_print("In bin init..\n");
}
static gboolean bus_call(GstBus* bus, GstMessage* msg, gpointer data) {
GMainLoop* loop = (GMainLoop*)data;
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_STATE_CHANGED: {
GstState old_state, new_state, pending_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
// Print out state changes for each element
g_print("Element %s changed state from %s to %s (pending: %s)\n", GST_OBJECT_NAME(msg->src),
gst_element_state_get_name(old_state), gst_element_state_get_name(new_state),
gst_element_state_get_name(pending_state));
break;
}
case GST_MESSAGE_ERROR: {
GError* err;
gchar* debug;
gst_message_parse_error(msg, &err, &debug);
g_print("Error: %s\n", err->message);
g_error_free(err);
g_free(debug);
g_main_loop_quit(loop);
break;
}
case GST_MESSAGE_EOS:
g_print("End of stream\n");
g_main_loop_quit(loop);
break;
default:
break;
}
return TRUE;
}
int main(int argc, char* argv[]) {
CustomData data;
GstCaps* sourcecaps;
gst_init(&argc, &argv);
data.loop = g_main_loop_new(NULL, FALSE);
g_print("Creating elements.\n");
data.pipeline = gst_pipeline_new("TestColorAugmenterPipeline");
data.videosource = gst_element_factory_make("videotestsrc", "videosource");
data.sourcevideoconvert = gst_element_factory_make("videoconvert", "videoconvert");
data.sourcecapsfilter = gst_element_factory_make("capsfilter", "sourcecapsfilter");
data.augmenterbin =
GST_AUGMENTER_BIN(g_object_new(GST_TYPE_AUGMENTER_BIN, "name", "AugmenterBin", NULL));
data.videosink = gst_element_factory_make("autovideosink", "videosink");
if (!data.videosource || !data.sourcevideoconvert || !data.sourcecapsfilter ||
!data.augmenterbin || !data.videosink) {
g_printerr("Could not create all elements\n");
return -1;
}
g_object_set(G_OBJECT(data.videosource), "is-live", TRUE, NULL);
sourcecaps = gst_caps_from_string("video/x-raw,format=BGR,width=640,height=400,framerate=10/1");
// g_object_set(G_OBJECT(data.videosource), "is-live", TRUE, NULL);
g_object_set(G_OBJECT(data.sourcecapsfilter), "caps", sourcecaps, NULL);
gst_caps_unref(sourcecaps);
// gst_bin_add_many(GST_BIN(data.augmenterbin), data.bintee, NULL);
gst_bin_add_many(GST_BIN(data.pipeline), data.videosource, data.sourcevideoconvert,
data.sourcecapsfilter, GST_ELEMENT(data.augmenterbin), data.videosink, NULL);
if (!gst_element_link_many(data.videosource, data.sourcevideoconvert, data.sourcecapsfilter,
GST_ELEMENT(data.augmenterbin), NULL)) {
g_printerr("Could not link all elements\n");
return -1;
}
GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(data.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"augmenter-pipeline-prereq");
// Request a red src pad from the bin, and link it with the videosink
g_print("Requesting red src pad from bin and linking with videosink\n");
GstPad* bin_src_pad = gst_element_request_pad_simple(GST_ELEMENT(data.augmenterbin), "src_red");
GstPad* video_sink_pad = gst_element_get_static_pad(data.videosink, "sink");
gst_pad_link(bin_src_pad, video_sink_pad);
gst_object_unref(bin_src_pad);
gst_object_unref(video_sink_pad);
g_print("Setting pipeline to playing.\n");
gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
g_usleep(100000);
// Confirmed that appsink is emitting signals
gboolean b = gst_app_sink_get_emit_signals(GST_APP_SINK(data.augmenterbin->red->appsink));
if (b) {
g_print("Appsink Emitting signals\n");
} else {
g_print("Appsink NOT Emitting signals\n");
}
// Confirmed that appsrc is emitting signals
b = gst_app_src_get_emit_signals(GST_APP_SRC(data.augmenterbin->red->appsrc));
if (b) {
g_print("Appsrc Emitting signals\n");
} else {
g_print("Appsrc NOT Emitting signals\n");
}
// Attempt to clear the buffer by pulling preroll
GstSample* sample = gst_app_sink_pull_preroll(GST_APP_SINK(data.augmenterbin->red->appsink));
if (sample) {
g_print("Pulled preroll sample to clear the queue.\n");
gst_sample_unref(sample);
}
GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(data.pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "augmenter-pipeline");
// Bus message callback
GstBus* bus = gst_element_get_bus(data.pipeline);
gst_bus_add_signal_watch(bus);
g_signal_connect(bus, "message", G_CALLBACK(bus_call), NULL);
gst_object_unref(bus);
g_main_loop_run(data.loop);
g_print("g_main_loop returned, stopping playback.\n");
g_print("Stopping pipeline.\n");
gst_element_set_state(data.pipeline, GST_STATE_NULL);
g_print("Deleting pipeline.\n");
gst_object_unref(GST_OBJECT(data.augmenterbin));
gst_object_unref(GST_OBJECT(data.pipeline));
g_main_loop_unref(data.loop);
return 0;
}