Complete misunderstanding of SEEK events

I have been working on this code for two days and I am at loss. I think I may completely misunderstand the use of seeking.

The high level overview of what I am trying to achieve is a small CLI utility that can accept an input video file (mp4) and extract a specified segment of it (e.g. 10 to 20 sec) and save that to a new mp4 file.

Here is an example that WORKS using gst-launch CLI interface. However, this copies the entire input file to the output file. So it doesn’t include the seeking/segment extraction.

./gst-launch-1.0 filesrc name=src location=/home/edge/f-nvme/videoserver_data/test_sbs.mp4 ! qtdemux ! queue2 ! h265parse ! nvh265dec ! videoconvert ! nvh264enc bitrate=4000 ! h264parse ! qtmux ! filesink location=out.mp4

Here is the C++ (cppstd=14) implementation that attempts to using a seek event to extract the desired segment. I modeled the code off of the examples here: subprojects/gstreamer/tests/check/pipelines/seek.c · main · GStreamer / gstreamer · GitLab.
It is a little long but I wanted the example to be completely self contained.
The resulting binary can be executed with:

./app input.mp4 out.mp4 5.0 10.0 --gst-debug-no-color

#include <gst/gst.h>

#include <atomic>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <thread>

struct InputArgs
{
    std::string inputFilepath;
    std::string outputFilepath;
    double startTime;
    double stopTime;
};

std::atomic<bool> ctrl_c_pressed(false);
void signal_handler(int signal)
{
    if(signal == SIGINT) {
        ctrl_c_pressed.store(true);
    }
}

#define FAIL_UNLESS(expr)                                                                         \
    do {                                                                                          \
        if(!(expr)) {                                                                             \
            std::cerr << "FAIL_UNLESS failed: " << #expr << " at " << __FILE__ << ":" << __LINE__ \
                      << std::endl;                                                               \
            return 1;                                                                             \
        }                                                                                         \
    } while(0)

void print_bus_message(GstMessage* msg)
{
    switch(GST_MESSAGE_TYPE(msg)) {
        case GST_MESSAGE_ERROR: {
            GError* err;
            gchar* debug_info;
            gst_message_parse_error(msg, &err, &debug_info);
            std::cerr << "Error received from element " << GST_OBJECT_NAME(msg->src) << ": "
                      << err->message << std::endl;
            std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
                      << std::endl;
            g_clear_error(&err);
            g_free(debug_info);
            break;
        }
        case GST_MESSAGE_WARNING: {
            GError* err;
            gchar* debug_info;
            gst_message_parse_warning(msg, &err, &debug_info);
            std::cerr << "Warning received from element " << GST_OBJECT_NAME(msg->src) << ": "
                      << err->message << std::endl;
            std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
                      << std::endl;
            g_clear_error(&err);
            g_free(debug_info);
            break;
        }
        case GST_MESSAGE_INFO: {
            GError* err;
            gchar* debug_info;
            gst_message_parse_info(msg, &err, &debug_info);
            std::cout << "Info received from element " << GST_OBJECT_NAME(msg->src) << ": "
                      << err->message << std::endl;
            std::cout << "Debugging information: " << (debug_info ? debug_info : "none")
                      << std::endl;
            g_clear_error(&err);
            g_free(debug_info);
            break;
        }
        case GST_MESSAGE_EOS:
            std::cout << "End-Of-Stream reached." << std::endl;
            break;
        case GST_MESSAGE_STATE_CHANGED: {
            GstState old_state, new_state, pending_state;
            gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state);
            std::cout << "Element " << GST_OBJECT_NAME(msg->src) << " changed state from "
                      << gst_element_state_get_name(old_state) << " to "
                      << gst_element_state_get_name(new_state) << "." << std::endl;
            break;
        }
        case GST_MESSAGE_BUFFERING: {
            gint percent = 0;
            gst_message_parse_buffering(msg, &percent);
            std::cout << "Buffering: " << percent << "%" << std::endl;
            break;
        }
        case GST_MESSAGE_CLOCK_LOST:
            std::cout << "Clock lost, pausing pipeline." << std::endl;
            gst_element_set_state(GST_ELEMENT(GST_MESSAGE_SRC(msg)), GST_STATE_PAUSED);
            gst_element_set_state(GST_ELEMENT(GST_MESSAGE_SRC(msg)), GST_STATE_PLAYING);
            break;
        default:
            std::cout << "Received message of type: " << GST_MESSAGE_TYPE_NAME(msg) << std::endl;
            break;
    }
}

gboolean onBusMessage(GstBus*, GstMessage* message, void*)
{
    // Extract user data
    auto msgType = GST_MESSAGE_TYPE(message);
    print_bus_message(message);
    switch(msgType) {
        case GST_MESSAGE_STATE_CHANGED:
            break;
        case GST_MESSAGE_ERROR:
            break;
        case GST_MESSAGE_EOS:
            break;
        default:
            break;
    }
    // Return true so this callback stays in place
    return true;
}

int main(int argc, char* argv[])
{
    if(argc != 5) {
        std::cerr << "Usage: " << argv[0]
                  << " <inputFilepath> <outputFilepath> <startTime> <stopTime>" << std::endl;
        std::cerr << "Example: ./app test.mp4 testout.mp4 5.0 10.0" << std::endl;
        return 1;
    }

    InputArgs args;
    try {
        // Parse the first two arguments as strings
        args.inputFilepath = argv[1];
        args.outputFilepath = argv[2];

        // Parse the next two arguments as doubles
        char* end;
        args.startTime = std::strtod(argv[3], &end);
        if(*end != '\0') {
            throw std::invalid_argument("Invalid double value for argument 3");
        }

        args.stopTime = std::strtod(argv[4], &end);
        if(*end != '\0') {
            throw std::invalid_argument("Invalid double value for argument 4");
        }
    } catch(const std::invalid_argument& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }

    // Initialize gstreamer
    if(!gst_init_check(&argc, &argv, NULL)) {
        std::cerr << "GStreamer failed to initialize" << std::endl;
        return 1;
    }

    // Create a GLib Main Loop and set it to run
    auto mainLoop = g_main_loop_new(NULL, FALSE);
    std::thread mainLoopThread([&mainLoop]() { g_main_loop_run(mainLoop); });

    // Build up the GStreamer pipeline
    // clang-format off
    std::stringstream ss;
    ss << "filesrc location=" << args.inputFilepath << " ! "
        << "qtdemux" << " ! "
        << "queue2 name=dec_queue" << " ! "
        << "h265parse" << " ! " << "nvh265dec" << " ! "
        << "videoconvert ! "
        << "nvh264enc bitrate=4000" << " ! "
        << "h264parse" << " ! "
        << "qtmux" << " ! "
        << "filesink location=" << args.outputFilepath;
    // clang-format on

    std::cout << "Attempting to run pipeline: " << ss.str() << std::endl;

    GError* gstParseErr{nullptr};
    auto pipeline = gst_parse_launch(ss.str().c_str(), &gstParseErr);
    if(gstParseErr) {
        std::cerr << "gst_parse_launch failed: " << gstParseErr->message << std::endl;
        g_error_free(gstParseErr);
        return 1;
    }

    auto bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    if(!bus) {
        std::cerr << "gst_pipeline_get_bus: failed" << std::endl;
        return 1;
    }

    gst_bus_add_watch(bus, (GstBusFunc)onBusMessage, nullptr);

    // Prepare for playing
    auto res = gst_element_set_state(pipeline, GST_STATE_PAUSED);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);
    // Wait for state change completion
    res = gst_element_get_state(pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    GST_INFO("Pause state transition was successful");

    auto eventRes =
        gst_element_send_event(pipeline,
                               gst_event_new_seek(1.0,
                                                  GST_FORMAT_TIME,
                                                  GST_SEEK_FLAG_FLUSH,
                                                  GST_SEEK_TYPE_SET,
                                                  (GstClockTime)args.startTime,
                                                  GST_SEEK_TYPE_SET,
                                                  (GstClockTime)args.stopTime * GST_SECOND));
    FAIL_UNLESS(eventRes == TRUE);

    GST_INFO("Seek was successful");

    // Run pipeline
    res = gst_element_set_state(pipeline, GST_STATE_PLAYING);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    auto msg = gst_bus_timed_pop_filtered(
        bus, GST_CLOCK_TIME_NONE, GstMessageType(GST_MESSAGE_EOS | GST_MESSAGE_ERROR));
    FAIL_UNLESS(strcmp(GST_MESSAGE_TYPE_NAME(msg), "eos") != 0);
    gst_message_unref(msg);

    res = gst_element_set_state(pipeline, GST_STATE_NULL);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    while(true) {
        if(ctrl_c_pressed.load()) {
            std::cout << "Ctrl-C was pressed. Exiting..." << std::endl;
            break;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

    // Cleanup
    g_main_loop_quit(mainLoop);
    mainLoopThread.join();
    gst_bus_remove_signal_watch(bus);
    gst_object_unref(bus);
    gst_object_unref(pipeline);
    g_main_loop_unref(mainLoop);

    return 0;
}

The logs generated are the following:

0:00:00.202538659 1020611 0x5af0e1d48290 WARN            GST_REGISTRY gstregistrybinary.c:551:gst_registry_binary_check_magic: Binary registry magic version is different : 1.23.90 != 1.12.0
Attempting to run pipeline: filesrc location=/home/edge/f-nvme/videoserver_data/test_sbs.mp4 ! qtdemux ! queue2 name=dec_queue ! h265parse ! nvh265dec ! videoconvert ! nvh264enc bitrate=4000 ! h264parse ! qtmux ! filesink location=/home/edge/f-nvme/videoserver_data/out.mp4
Element filesink0 changed state from NULL to READY.
Element qtmux0 changed state from NULL to READY.
Element h264parse0 changed state from NULL to READY.
Received message of type: need-context
Received message of type: have-context
Element nvh264enc0 changed state from NULL to READY.
Element videoconvert0 changed state from NULL to READY.
Element nvh265dec0 changed state from NULL to READY.
Element h265parse0 changed state from NULL to READY.
Element dec_queue changed state from NULL to READY.
Element qtdemux0 changed state from NULL to READY.
Element filesrc0 changed state from NULL to READY.
Element pipeline0 changed state from NULL to READY.
Received message of type: stream-status
Element qtmux0 changed state from READY to PAUSED.
Element h264parse0 changed state from READY to PAUSED.
Element nvh264enc0 changed state from READY to PAUSED.
Element videoconvert0 changed state from READY to PAUSED.
Received message of type: stream-status
Element nvh265dec0 changed state from READY to PAUSED.
Element h265parse0 changed state from READY to PAUSED.
Received message of type: stream-status
Element dec_queue changed state from READY to PAUSED.
Received message of type: stream-status
Element qtdemux0 changed state from READY to PAUSED.
Element filesrc0 changed state from READY to PAUSED.
Received message of type: stream-status
Received message of type: stream-status
Buffering: 100%
Received message of type: latency
Received message of type: latency
Received message of type: duration-changed
Received message of type: latency
Received message of type: latency
Received message of type: duration-changed
0:00:00.284804604 1020611 0x730820000b70 FIXME               basesink gstbasesink.c:3399:gst_base_sink_default_event:<filesink0> stream-start event without group-id. Consider implementing group-id handling in the upstream elements
Received message of type: stream-start
Element filesink0 changed state from READY to PAUSED.
Element pipeline0 changed state from READY to PAUSED.
Received message of type: async-done
Received message of type: latency

(eaivideo-gst-seek:1020611): GStreamer-CRITICAL **: 16:36:56.247: gst_segment_do_seek: assertion 'segment->format == format' failed
0:00:00.284942968 1020611 0x730820000b70 WARN                   qtmux gstqtmux.c:1921:gst_qt_mux_send_buffer:<qtmux0> Failed to send buffer (0x7308180024f0) size 20
0:00:00.284949595 1020611 0x730820000b70 FIXME             aggregator gstaggregator.c:1478:gst_aggregator_loop:<qtmux0> Subclass should call gst_aggregator_selected_samples() from its aggregate implementation.
Element filesink0 changed state from PAUSED to PAUSED.
Element pipeline0 changed state from PAUSED to PAUSED.
Received message of type: stream-status
Received message of type: reset-time
Received message of type: stream-status

I tried adding a pad probe to the queue2 element to see what buffer timestamps were going by and after setting to PAUSED, the entire video file was read (at least all the key frames). I think this is causing the buffers to fill up the rest of the pipeline but I’m not really sure.

My question is: Is this the correct way to do this? If not, could anyone provide tips as to what I’m doing wrong?

Thank you!

EDIT/UPDATE: 05/31/2024

I fixed some issues with the code and cleaned up a bit.

Something strange is going on because when I run the application with export GST_DEBUG=6,seek*:6, I do see the bus post an EOS message and the file does have data even though gst-discoverer-1.0 returns Stream contains no data.

When I run with: export GST_DEBUG=2,seek*:6 I do not see the EOS get posted to the bus.

@dabrain34, maybe this if familiar to you? I could be mistaken but I believe you wrote the seek examples here

Logs:
Verbose Logs
Slim Logs

Main.cpp

#include <gst/gst.h>

#include <atomic>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <thread>

// Launching the app:
//  ./app ./input.mp4 ./out.mp4 5.0 10 --gst-debug-no-color
// For logging:
//  export GST_DEBUG=6,seek*:6
// For pipeline graphs:
//  sudo apt-get install graphviz
//  export GST_DEBUG_DUMP_DOT_DIR=pipeline/
//  dot -T{format} input_file > output_file
// Redirect output to a log file
// ./app > log.txt 2>&1

GST_DEBUG_CATEGORY_STATIC(seek_example);
#define GST_CAT_DEFAULT seek_example

std::atomic<bool> ctrl_c_pressed(false);

struct InputArgs
{
    std::string inputFilepath;
    std::string outputFilepath;
    double startTime;
    double stopTime;
};

void signal_handler(int signal)
{
    if(signal == SIGINT) {
        ctrl_c_pressed.store(true);
    }
}

#define FAIL_UNLESS(expr)                                                                         \
    do {                                                                                          \
        if(!(expr)) {                                                                             \
            std::cerr << "FAIL_UNLESS failed: " << #expr << " at " << __FILE__ << ":" << __LINE__ \
                      << std::endl;                                                               \
            return 1;                                                                             \
        }                                                                                         \
    } while(0)

gboolean onBusMessage(GstBus*, GstMessage* message, gpointer user_data)
{
    // Extract user data
    auto msgType = GST_MESSAGE_TYPE(message);
    auto pipeline = static_cast<GstElement*>(user_data);
    switch(msgType) {
        case GST_MESSAGE_STATE_CHANGED:
            // Create a new
            if(strcmp(GST_OBJECT_NAME(message->src), gst_element_get_name(pipeline)) == 0) {
                GstState old_state, new_state, pending_state;
                gst_message_parse_state_changed(message, &old_state, &new_state, &pending_state);
                auto old_state_name = gst_element_state_get_name(old_state);
                auto new_state_name = gst_element_state_get_name(new_state);
                char buffer[200];
                snprintf(buffer,
                         200,
                         "%s.%s_%s",
                         gst_debug_category_get_name(GST_CAT_DEFAULT),
                         old_state_name,
                         new_state_name);
                GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(
                    GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, (const char*)buffer);
                GST_INFO("Element %s changed state from %s to %s.",
                         GST_OBJECT_NAME(message->src),
                         old_state_name,
                         new_state_name);
            }
            break;
        case GST_MESSAGE_ERROR: {
            GError* err;
            gchar* debug_info;
            gst_message_parse_error(message, &err, &debug_info);
            std::cerr << "Error received from element " << GST_OBJECT_NAME(message->src) << ": "
                      << err->message << std::endl;
            std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
                      << std::endl;
            g_clear_error(&err);
            g_free(debug_info);
            break;
        }
        case GST_MESSAGE_WARNING: {
            GError* err;
            gchar* debug_info;
            gst_message_parse_warning(message, &err, &debug_info);
            std::cerr << "Warning received from element " << GST_OBJECT_NAME(message->src) << ": "
                      << err->message << std::endl;
            std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
                      << std::endl;
            g_clear_error(&err);
            g_free(debug_info);
            break;
        }
        case GST_MESSAGE_EOS:
            std::cout << "GOT EOS" << std::endl;
            break;
        default:
            break;
    }
    // Return true so this callback stays in place
    return true;
}

static GstPadProbeReturn pad_probe_callback(GstPad*, GstPadProbeInfo* info, gpointer)
{
    // Check if the data contains a buffer
    if(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER) {
        GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
        if(buffer) {
            GstClockTime timestamp = GST_BUFFER_PTS(buffer);
            GST_DEBUG("Buffer Timestamp: %" GST_TIME_FORMAT, GST_TIME_ARGS(timestamp));
        }
    }
    return GST_PAD_PROBE_OK;
}

int main(int argc, char* argv[])
{
    if(argc < 5) {
        std::cerr << "Usage: " << argv[0]
                  << " <inputFilepath> <outputFilepath> <startTime> <stopTime>" << std::endl;
        std::cerr << "Example: ./app test.mp4 testout.mp4 5.0 10.0" << std::endl;
        return 1;
    }

    InputArgs args;
    try {
        // Parse the first two arguments as strings
        args.inputFilepath = argv[1];
        args.outputFilepath = argv[2];

        // Parse the next two arguments as doubles
        char* end;
        args.startTime = std::strtod(argv[3], &end);
        if(*end != '\0') {
            throw std::invalid_argument("Invalid double value for argument 3");
        }

        args.stopTime = std::strtod(argv[4], &end);
        if(*end != '\0') {
            throw std::invalid_argument("Invalid double value for argument 4");
        }
    } catch(const std::invalid_argument& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }

    // Initialize gstreamer
    if(!gst_init_check(&argc, &argv, NULL)) {
        std::cerr << "GStreamer failed to initialize" << std::endl;
        return 1;
    }

    GST_DEBUG_CATEGORY_INIT(
        seek_example,
        "seek_example",
        0,
        "Example showcasing how to use seek to extract a subsection of mp4 video");

    // Create a GLib Main Loop and set it to run
    auto mainLoop = g_main_loop_new(NULL, FALSE);
    std::thread mainLoopThread([&mainLoop]() { g_main_loop_run(mainLoop); });

    // Build up the GStreamer pipeline
    // clang-format off
    std::stringstream ss;
    ss << "filesrc location=" << args.inputFilepath << " ! "
        << "parsebin" << " ! "
        // << "queue name=probe_element" << " ! "
        << "nvh265dec" << " ! "
        << "videoconvert ! "
        << "queue" << " ! "
        << "nvh264enc bitrate=4000" << " ! "
        << "h264parse" << " ! "
        << "queue name=probe_element" << " ! "
        << "matroskamux" << " ! "
        // << "qtmux" << " ! "
        // << "mp4mux" << " ! "
        << "filesink location=" << args.outputFilepath;
    // clang-format on

    std::cout << "Attempting to run pipeline: " << ss.str() << std::endl;

    GError* gstParseErr{nullptr};
    auto pipeline = gst_parse_launch(ss.str().c_str(), &gstParseErr);
    if(gstParseErr) {
        std::cerr << "gst_parse_launch failed: " << gstParseErr->message << std::endl;
        g_error_free(gstParseErr);
        return 1;
    }

    // Add bus watch
    auto bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    if(!bus) {
        std::cerr << "gst_pipeline_get_bus: failed" << std::endl;
        return 1;
    }
    gst_bus_add_watch(bus, (GstBusFunc)onBusMessage, pipeline);

    // Add pad probe to watch buffer timestamps going by elements
    auto element = gst_bin_get_by_name(GST_BIN(pipeline), "probe_element");
    auto element_sink_pad = gst_element_get_static_pad(element, "sink");
    if(!element_sink_pad) {
        GST_ERROR("Failed to get element sink pad.");
        return false;
    }
    gst_pad_add_probe(element_sink_pad,
                      GstPadProbeType(GST_PAD_PROBE_TYPE_BUFFER),
                      pad_probe_callback,
                      NULL,
                      NULL);
    gst_object_unref(element_sink_pad);

    // Prepare for playing
    auto res = gst_element_set_state(pipeline, GST_STATE_PAUSED);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);
    // Wait for state change completion
    res = gst_element_get_state(pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    GST_INFO("Pause state transition was successful");

    auto seek_start = (GstClockTime)args.startTime * GST_SECOND;
    auto seek_stop = (GstClockTime)args.stopTime * GST_SECOND;
    GST_INFO("Seeking from %li to %li", seek_start, seek_stop);

    auto eventRes = gst_element_send_event(pipeline,
                                           gst_event_new_seek(1.0,
                                                              GST_FORMAT_TIME,
                                                              GST_SEEK_FLAG_FLUSH,
                                                              GST_SEEK_TYPE_SET,
                                                              seek_start,
                                                              GST_SEEK_TYPE_SET,
                                                              seek_stop));
    FAIL_UNLESS(eventRes == TRUE);

    GST_INFO("Seek was successful");

    // Run pipeline
    res = gst_element_set_state(pipeline, GST_STATE_PLAYING);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    GST_INFO("Pipeline is playing... CTRL-C to stop.");

    while(true) {
        if(ctrl_c_pressed.load()) {
            std::cout << "Ctrl-C was pressed. Exiting..." << std::endl;
            break;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

    res = gst_element_set_state(pipeline, GST_STATE_NULL);
    FAIL_UNLESS(res != GST_STATE_CHANGE_FAILURE);

    // Cleanup
    g_main_loop_quit(mainLoop);
    mainLoopThread.join();
    gst_bus_remove_signal_watch(bus);
    gst_object_unref(bus);
    gst_object_unref(pipeline);
    g_main_loop_unref(mainLoop);

    return 0;
}