Here is the code sample:
#include <gst/gst.h>
#include <chrono>
#include <thread>
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif
typedef struct _CustomData {
GstElement *pipeline;
GMainLoop *loop;
GstElement *source;
GstElement *videoconvert;
GstElement *queue;
GstElement *identity;
GstElement *sink;
} CustomData;
static void on_queue_overrun(GstElement *queue, gpointer user_data) {
g_print("queue reached its max, dropping some frames!\n");
// guint current_level_buffers, current_level_time;
// g_object_get(G_OBJECT(queue), "current-level-buffers", ¤t_level_buffers, NULL);
// g_object_get(G_OBJECT(queue), "current-level-time", ¤t_level_time, NULL);
// g_print("current-level-buffers: %d, current-level-time: %d\n", current_level_buffers, current_level_time);
// g_print("current-level-buffers: %d\n", current_level_buffers);
}
static bool print_queue_status(gpointer user_data) {
auto custom_data = (CustomData *)user_data;
int current_level_buffers, current_level_time, current_level_bytes;
g_object_get(G_OBJECT(custom_data->queue), "current-level-buffers", ¤t_level_buffers, NULL);
g_object_get(G_OBJECT(custom_data->queue), "current-level-time", ¤t_level_time, NULL);
g_object_get(G_OBJECT(custom_data->queue), "current-level-bytes", ¤t_level_bytes, NULL);
g_print("current-level-buffers: %d\n current-level-time: %d\n current-level-bytes: %d\n", current_level_buffers,
current_level_time, current_level_bytes);
return 1;
}
static void cb_message(GstBus *bus, GstMessage *msg, CustomData *data) {
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR: {
GError *err;
gchar *debug;
gst_message_parse_error(msg, &err, &debug);
g_print("Error: %s\n", err->message);
g_error_free(err);
g_free(debug);
gst_element_set_state(data->pipeline, GST_STATE_READY);
g_main_loop_quit(data->loop);
break;
}
case GST_MESSAGE_EOS:
/* end-of-stream */
gst_element_set_state(data->pipeline, GST_STATE_READY);
g_main_loop_quit(data->loop);
break;
case GST_MESSAGE_CLOCK_LOST:
/* Get a new clock */
gst_element_set_state(data->pipeline, GST_STATE_PAUSED);
gst_element_set_state(data->pipeline, GST_STATE_PLAYING);
break;
default:
/* Unhandled message */
break;
}
}
// Callback function for the "pad-added" signal
static void on_pad_added(GstElement *element, GstPad *pad, gpointer data) {
// Check if the pad is an audio or video pad and link it to decodebin accordingly
auto customData = (CustomData *)data;
GstCaps *caps = gst_pad_query_caps(pad, NULL);
const gchar *pad_type = gst_structure_get_name(gst_caps_get_structure(caps, 0));
g_print("pad type %s\n", pad_type);
gst_caps_unref(caps);
if (g_str_has_prefix(pad_type, "video/x-raw")) {
GstPad *sinkpad = gst_element_get_static_pad(customData->videoconvert, "sink");
if (gst_pad_link(pad, sinkpad) != GST_PAD_LINK_OK) {
g_print("cannot link src to converter!\n");
} else {
g_print("pad type %s linked succefully\n", pad_type);
}
gst_object_unref(sinkpad);
}
}
static void handoff_callback(GstElement *identity, GstBuffer *buffer, gpointer udata) {
// introduce downstream latency
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
int tutorial_main(int argc, char *argv[]) {
CustomData customData;
GstBus *bus;
/* Initialize GStreamer */
gst_init(&argc, &argv);
// Create pipeline and elements
customData.pipeline = gst_pipeline_new("my-pipeline");
customData.source = gst_element_factory_make("uridecodebin", "source");
customData.videoconvert = gst_element_factory_make("videoconvert", "videoconvert");
customData.queue = gst_element_factory_make("queue", "queue");
customData.identity = gst_element_factory_make("identity", "identity");
customData.sink = gst_element_factory_make("autovideosink", "autovideosink");
// set props
g_object_set(customData.source, "uri", "file:////home/user/testFolder/test.mkv", NULL);
g_object_set(customData.queue, "leaky", 2, "max-size-buffers", 50, "max-size-bytes", 0, "max-size-time", 0, NULL);
g_object_set(customData.sink, "sync", FALSE, NULL);
// Check if elements were created successfully
if (!customData.pipeline || !customData.source || !customData.videoconvert || !customData.identity ||
!customData.queue || !customData.sink) {
g_print("Elements could not be created. Exiting.\n");
return -1;
}
// Add elements to the pipeline
gst_bin_add_many(GST_BIN(customData.pipeline), customData.source, customData.videoconvert, customData.identity,
customData.queue, customData.sink, NULL);
// Link the elements
if (!gst_element_link_many(customData.videoconvert, customData.identity, customData.queue, customData.sink, NULL)) {
g_print("Elements could not be linked. Exiting.\n");
return -1;
}
// Set up the "pad-added" signal handler for the source element
g_signal_connect(customData.source, "pad-added", G_CALLBACK(on_pad_added), &customData);
// connect signal with callback
g_signal_connect(G_OBJECT(customData.queue), "overrun", G_CALLBACK(on_queue_overrun), NULL);
g_signal_connect(G_OBJECT(customData.identity), "handoff", G_CALLBACK(handoff_callback), NULL);
// Start playing
gst_element_set_state(customData.pipeline, GST_STATE_PLAYING);
/* Wait until error or EOS */
bus = gst_element_get_bus(customData.pipeline);
GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
customData.loop = main_loop;
gst_bus_add_watch(GST_ELEMENT_BUS(customData.pipeline), (GstBusFunc)cb_message, &customData);
g_timeout_add(250, (GSourceFunc)print_queue_status, &customData);
g_main_loop_run(main_loop);
/* Free resources */
g_main_loop_unref(main_loop);
/* Free resources */
gst_object_unref(bus);
gst_element_set_state(customData.pipeline, GST_STATE_NULL);
gst_object_unref(customData.pipeline);
return 0;
}
int main(int argc, char *argv[]) {
#if defined(__APPLE__) && TARGET_OS_MAC && !TARGET_OS_IPHONE
return gst_macos_main(tutorial_main, argc, argv, NULL);
#else
return tutorial_main(argc, argv);
#endif
}