RTSP stream frame extraction problem

#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <gst/video/video.h>
#include <jni.h>
#include <string.h>
#include <android/log.h>
#include <glib.h>

#define LOG_TAG “GStreamerNative”
#define LOGI(…) __android_log_print(ANDROID_LOG_INFO, LOG_TAG, VA_ARGS)
#define LOGE(…) __android_log_print(ANDROID_LOG_ERROR, LOG_TAG, VA_ARGS)
#define LOGW(…) __android_log_print(ANDROID_LOG_WARN, LOG_TAG, VA_ARGS)

#define CHECK_ELEMENT(elem, name)
if (!(elem)) {
LOGE(“Failed to create element: %s”, name);
return;
}

static jobject g_callback_instance = NULL;
static jmethodID g_onFrameMethod = NULL;
static JavaVM *g_jvm = NULL;

static GstElement *g_pipeline = NULL;
static GMainLoop *g_main_loop = NULL;
static GThread *g_main_loop_thread = NULL;
static GMutex pipeline_lock;

static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
GError *err;
gchar *debug;

switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_EOS:
        LOGI("End of stream");
        break;

    case GST_MESSAGE_ERROR:
        gst_message_parse_error(msg, &err, &debug);
        LOGE("Error: %s", err->message);
        g_error_free(err);
        g_free(debug);
        break;

    case GST_MESSAGE_WARNING:
        gst_message_parse_warning(msg, &err, &debug);
        LOGW("Warning: %s", err->message);
        g_error_free(err);
        g_free(debug);
        break;

    case GST_MESSAGE_STATE_CHANGED: {
        if (GST_MESSAGE_SRC(msg) == GST_OBJECT(g_pipeline)) {
            GstState old_state, new_state, pending_state;
            gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
            LOGI("Pipeline state changed from %s to %s",
                 gst_element_state_get_name(old_state),
                 gst_element_state_get_name(new_state));
        }
        break;
    }

    default:
        break;
}

return TRUE;

}

void send_frame_to_java(JNIEnv *env, const guint8 *data, int width, int height, int size) {
if (!g_callback_instance || !g_onFrameMethod) {
LOGE(“Callback instance or method not found”);
return;
}

jbyteArray byteArray = (*env)->NewByteArray(env, size);
if (!byteArray) {
    LOGE("Failed to allocate byte array");
    return;
}

(*env)->SetByteArrayRegion(env, byteArray, 0, size, (const jbyte *)data);
(*env)->CallVoidMethod(env, g_callback_instance, g_onFrameMethod, byteArray, width, height);
(*env)->DeleteLocalRef(env, byteArray);

}

static GstFlowReturn on_new_sample(GstAppSink *appsink, gpointer user_data) {
LOGI(“new-sample signal triggered”);
static int sample_counter = 0;
LOGI(“new-sample signal #%d”, ++sample_counter);

g_mutex_lock(&pipeline_lock);

LOGI("Attempting to pull sample...",appsink);
if (!appsink) {
    LOGE("App Sink is NULL");
    g_mutex_unlock(&pipeline_lock);
    return GST_FLOW_OK;
}


GstSample *sample = gst_app_sink_pull_sample(appsink);
if (!sample) {
    LOGE("Sample is NULL");
    g_mutex_unlock(&pipeline_lock);

    return GST_FLOW_OK;
}
LOGI("Sample Pulled ...");
GstBuffer *buffer = gst_sample_get_buffer(sample);
LOGI("Generating Buffer ...");
GstCaps *caps = gst_sample_get_caps(sample);
LOGI("Caps Filter Applying");
GstVideoInfo info;

if (!buffer || !caps || !gst_video_info_from_caps(&info, caps)) {
    LOGE("Invalid buffer or caps");
    gst_sample_unref(sample);
    g_mutex_unlock(&pipeline_lock);
    return GST_FLOW_OK;
}

GstMapInfo map;
if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
    JNIEnv *env = NULL;
    if ((*g_jvm)->AttachCurrentThread(g_jvm, (void **)&env, NULL) == 0) {
        send_frame_to_java(env, map.data, info.width, info.height, map.size);
    }
    gst_buffer_unmap(buffer, &map);
}

gst_sample_unref(sample);
g_mutex_unlock(&pipeline_lock);
return GST_FLOW_OK;

}

static void on_rtsp_pad_added(GstElement *src, GstPad *new_pad, gpointer user_data) {
LOGE(“PAD ADDED”);
GstElement *depay = GST_ELEMENT(user_data);
GstPad *sink_pad = gst_element_get_static_pad(depay, “sink”);

if (!gst_pad_is_linked(sink_pad)) {
    GstCaps *new_pad_caps = gst_pad_get_current_caps(new_pad);
    gchar *desc = gst_caps_to_string(new_pad_caps);
    LOGW("Dynamic pad created: %s\n", desc);
    g_free(desc);

    GstPadLinkReturn ret = gst_pad_link(new_pad, sink_pad);
    if (GST_PAD_LINK_FAILED(ret)) {
        LOGE("Type is '%s' but link failed.\n", desc);
    } else {
        LOGI("Link succeeded (type: %s)\n", desc);
    }
    gst_caps_unref(new_pad_caps);
}
gst_object_unref(sink_pad);

}

static void create_and_configure_elements(
const char *url,
GstElement **src,
GstElement **depay,
GstElement **parse,
GstElement **decode,
GstElement **convert,
GstElement **capsfilter,
GstElement **appsink,
gpointer user_data // for appsink callback
) {
// Create elements
*src = gst_element_factory_make(“rtspsrc”, “source”);
*depay = gst_element_factory_make(“rtph264depay”, “depay”);
*parse = gst_element_factory_make(“h264parse”, “parse”);
*decode = gst_element_factory_make(“openh264dec”, “decoder”);
*convert = gst_element_factory_make(“videoconvert”, “convert”);
*capsfilter = gst_element_factory_make(“capsfilter”, “capsfilter”);
*appsink = gst_element_factory_make(“appsink”, “sink”);

// Check all created elements
CHECK_ELEMENT(*src, "rtspsrc");
CHECK_ELEMENT(*depay, "rtph264depay");
CHECK_ELEMENT(*parse, "h264parse");
CHECK_ELEMENT(*decode, "openh264dec");
CHECK_ELEMENT(*convert, "videoconvert");
//CHECK_ELEMENT(*capsfilter, "capsfilter");
CHECK_ELEMENT(*appsink, "appsink");

// Configure rtspsrc
g_object_set(*src,
             "location", url,
             "latency", 100,
             NULL);

// Connect pad-added signal for dynamic pad from rtspsrc
g_signal_connect(*src, "pad-added", G_CALLBACK(on_rtsp_pad_added), *depay);

// Configure capsfilter
GstCaps *caps = gst_caps_from_string("video/x-raw,format=RGB");


if (caps != NULL) {
    g_object_set(*capsfilter, "caps", caps, NULL);
    gst_caps_unref(caps);
    LOGI("GstCaps Created from string\n");
} else {
    LOGE("Failed to create GstCaps from string\n");
}


// Configure appsink
g_object_set(*appsink,
             "emit-signals", TRUE,
             "sync", FALSE,
             "max-buffers", 1,
             "drop", TRUE,
             NULL);

// Connect appsink callback
g_signal_connect(*appsink, "new-sample", G_CALLBACK(on_new_sample), user_data);

}

static gpointer main_loop_thread_func(gpointer data) {
g_main_loop_run(g_main_loop);
return NULL;
}

typedef struct {
char *url;
} PipelineStartData;

static gpointer start_pipeline_thread(gpointer user_data) {
PipelineStartData *data = (PipelineStartData *)user_data;
const char *url = data->url;
LOGI(“Pipeline thread starting with URL: %s”, url);

GstElement *src = NULL, *depay = NULL, *parse = NULL;
GstElement *decode = NULL, *convert = NULL, *capsfilter = NULL, *appsink = NULL;

// Create and configure all elements
create_and_configure_elements(url, &src, &depay, &parse, &decode, &convert, &capsfilter, &appsink, NULL);

// Create the pipeline
g_pipeline = gst_pipeline_new("rtsp-pipeline");
if (!g_pipeline) {
    LOGE("Failed to create pipeline");
    goto cleanup;
}

// Add elements to pipeline
gst_bin_add_many(GST_BIN(g_pipeline), src, depay, parse, decode, convert, capsfilter, appsink, NULL);

// Link static elements (skip rtspsrc which links dynamically)
if (!gst_element_link_many(depay, parse, decode, convert, capsfilter, appsink, NULL)) {
    LOGE("Failed to link static elements");
    goto cleanup;
}

// Handle dynamic pad from rtspsrc
g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay);

// Set up the bus to watch for messages
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(g_pipeline));
gst_bus_add_watch(bus, bus_call, NULL);
gst_object_unref(bus);

// Start playback
GstStateChangeReturn ret = gst_element_set_state(g_pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
    LOGE("Failed to set pipeline to PLAYING");
    goto cleanup;
}

LOGI("Pipeline set to PLAYING");

// Run the GLib main loop in a separate thread
g_main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_thread = g_thread_new("gstreamer-main-loop", main_loop_thread_func, NULL);

cleanup:
if (data) {
    g_free(data->url);
    g_free(data);
}

return NULL;

}

JNIEXPORT void JNICALL
Java_com_gst_1sdk_1tutorials_tutorial_15_MainActivity_loadNativePlugin(JNIEnv *env, jobject thiz, jstring pluginPath) {
if (!env || !pluginPath) {
LOGE(“Invalid parameters to loadNativePlugin”);
return;
}

const char *path = (*env)->GetStringUTFChars(env, pluginPath, 0);
if (!path) {
    LOGE("Failed to get UTF path from jstring");
    return;
}

GError *error = NULL;
GstPlugin *plugin = gst_plugin_load_file(path, &error);
if (!plugin) {
    LOGE("Failed to load plugin from %s: %s", path, error ? error->message : "Unknown error");
    if (error) g_clear_error(&error);
} else {
    gst_object_unref(plugin);
    LOGI("Successfully loaded plugin from: %s", path);
}

(*env)->ReleaseStringUTFChars(env, pluginPath, path);

}

JNIEXPORT void JNICALL Java_com_gst_1sdk_1tutorials_tutorial_15_MainActivity_nativeInit(JNIEnv *env, jobject instance) {
g_mutex_init(&pipeline_lock);
(*env)->GetJavaVM(env, &g_jvm);
g_callback_instance = (*env)->NewGlobalRef(env, instance);
jclass cls = (*env)->GetObjectClass(env, instance);
g_onFrameMethod = (*env)->GetMethodID(env, cls, “onFrameReady”, “([BII)V”);

gst_debug_set_default_threshold(GST_LEVEL_DEBUG);
//setenv("GST_PLUGIN_PATH", "/data/data/com.gst_sdk_tutorials.tutorial_5/files/gstreamer-plugins", 1);
setenv("LD_LIBRARY_PATH", "/data/data/com.gst_sdk_tutorials.tutorial_5/files/gstreamer-plugins", 1);
gst_init(NULL, NULL);
LOGI("GStreamer initialized");

}

JNIEXPORT void JNICALL Java_com_gst_1sdk_1tutorials_tutorial_15_MainActivity_nativeStartPipeline(JNIEnv *env, jobject instance, jstring jurl) {
const char *url = (*env)->GetStringUTFChars(env, jurl, 0);
PipelineStartData *data = g_malloc(sizeof(PipelineStartData));
data->url = g_strdup(url);
(*env)->ReleaseStringUTFChars(env, jurl, url);
g_thread_new(“start-pipeline-thread”, start_pipeline_thread, data);
}

JNIEXPORT void JNICALL Java_com_gst_1sdk_1tutorials_tutorial_15_MainActivity_nativeStopPipeline(JNIEnv *env, jobject instance) {
g_mutex_lock(&pipeline_lock);

if (g_pipeline) {
    gst_element_set_state(g_pipeline, GST_STATE_NULL);
    gst_object_unref(g_pipeline);
    g_pipeline = NULL;
}

if (g_main_loop) {
    g_main_loop_quit(g_main_loop);
    if (g_main_loop_thread) {
        g_thread_join(g_main_loop_thread);
        g_main_loop_thread = NULL;
    }
    g_main_loop_unref(g_main_loop);
    g_main_loop = NULL;
}

if (g_callback_instance) {
    (*env)->DeleteGlobalRef(env, g_callback_instance);
    g_callback_instance = NULL;
}
g_onFrameMethod = NULL;
g_jvm = NULL;

g_mutex_unlock(&pipeline_lock);
LOGI("Pipeline stopped and cleaned up");

}

code above provided returns null when sample is pulled can anyone help out on this cause i am being stuck in this for a long time now.