How to generate multiple thumbnails from 1 pipline or know when seek is finished

I am trying to create multiple thumbnails (8-12) from a video source. I followed the example in the gilab and am able to successfully produce a thumbnail with a height of 180.

However I am unable to create multiple thumbnails from the same pipeline, as I am unaware of any reliable way of tracking a seek being finished. I checked the message API on gstreamer and did not find anything. Without a way of knowing if seek is done (and a thumbnail sample captured) all the seeks flush each other and only the last one would generate a thumbnail.

Currently I am spawning 12 threads, each of which create their own pipeline, seek once and then destroy it. This takes ~4 secs for a 23 minute video. I ran a profiler and my 2 thumbnail functions (launch threads and the example code) are both < 1% of the total runtime with most of it being gstreamer stuff. Of patricular note were the following

  • 18.5% gst_pad_push (15.8% gst_base_parse_push_frame)
  • 11.6% gst_data_deque_pop
  • 24% libgstmatroska

The following is my thread spawning code. The create_thumbnail function is essentially the code from the example I posted prior.

fn thumbnail_thread(video_uri: String, video_duration: ClockTime) {
        let step = video_duration.mseconds() / (NUM_THUMBNAILS + 2); // + 2 so first and last frame not chosen
        let barrier = Arc::new(Barrier::new((NUM_THUMBNAILS + 1) as usize));

        for i in 0..NUM_THUMBNAILS {
            let uri = video_uri.clone();
            let barrier = barrier.clone();

            thread::spawn(move || {
                let save_path = std::path::PathBuf::from(format!("/{}/thumbnail_{}.jpg", THUMBNAIL_PATH, i));
                let timestamp = gst::GenericFormattedValue::from(ClockTime::from_mseconds(step + (step * i)));

                let pipeline = VideoPlayerModel::create_thumbnail(save_path, uri).expect("could not create thumbnail pipeline");
                pipeline.set_state(gst::State::Paused).unwrap();
                let bus = pipeline.bus().expect("Pipeline without a bus.");
                let mut seeked = false;

                for msg in bus.iter_timed(ClockTime::NONE) {
                    use gst::MessageView;

                    match msg.view() {
                        MessageView::AsyncDone(..) => {
                            if !seeked {
                                if pipeline.seek_simple(SeekFlags::FLUSH | SeekFlags::KEY_UNIT, timestamp).is_err()
                                {
                                    println!("Failed to seek");
                                }
                                pipeline.set_state(gst::State::Playing).unwrap();
                                seeked = true;
                            }
                        }
                        MessageView::Eos(..) => break,
                        _ => ()
                    }
                }
                pipeline.set_state(gst::State::Null).unwrap();
                barrier.wait();
            });
        }

        barrier.wait();
    }

I think an solution to any of the following would help me:

  • a way to know when a seek is done
  • speed up pipeline creation/initialization
  • get samples at timestamps another way then seeking

I did manage to figure out how to use 1 pipeline for multiple thumbnails. Unfortunately it did not solve my speed issues (is slower infact, esp for larger thumbnail numbers), but it did lead to a lot less memory consumption and less cpu usage. The previous method would scale very poorly with larger thumbnail numbers.

The code is a bit unclean (the sender doesn’t send any data just a signal the calling thread)
The slightly modified example code:

fn create_thumbnail_pipeline(
        got_current_thumb: Arc<Mutex<bool>>,
        current_thumb_num: Arc<Mutex<u64>>,
        video_uri: String,
        senders: mpsc::Sender<u8>) -> Result<gst::Pipeline, Error>
    {
        let pipeline = gst::parse::launch(&format!(
            "uridecodebin uri={video_uri} ! videoconvert ! appsink name=sink"
        )).unwrap()
            .downcast::<gst::Pipeline>()
            .expect("Expected a gst::pipeline");

        let appsink = pipeline
            .by_name("sink")
            .expect("sink element not found")
            .downcast::<gst_app::AppSink>()
            .expect("Sink element is expected to be appsink!");

        appsink.set_property("sync", false);

        appsink.set_caps(Some(
            &gst_video::VideoCapsBuilder::new()
                .format(gst_video::VideoFormat::Rgbx)
                .build(),
        ));

        appsink.set_callbacks(
            gst_app::AppSinkCallbacks::builder()
                .new_sample(move |appsink| {
                    let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Error).unwrap();
                    let buffer = sample.buffer().ok_or_else(|| {
                        element_error!(appsink, gst::ResourceError::Failed, ("Failed"));
                        gst::FlowError::Error
                    }).unwrap();

                    let mut got_current = got_current_thumb.lock().unwrap();

                    if *got_current {
                        return Err(gst::FlowError::Eos);
                    }

                    *got_current = true;

                    let caps = sample.caps().expect("sample without caps");
                    let info = gst_video::VideoInfo::from_caps(caps).expect("Failed to parse caps");

                    let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, &info)
                        .map_err(|_| {
                            element_error!(appsink, gst::ResourceError::Failed, ("Failed to map buff readable"));
                            gst::FlowError::Error
                        }).unwrap();

                    let aspect_ratio = (frame.width() as f64 * info.par().numer() as f64)
                        / (frame.height() as f64 * info.par().denom() as f64);
                    let target_height = THUMBNAIL_HEIGHT;
                    let target_width = target_height as f64 * aspect_ratio;

                    let img = image::FlatSamples::<&[u8]> {
                        samples: frame.plane_data(0).unwrap(),
                        layout: image::flat::SampleLayout {
                            channels: 3,
                            channel_stride: 1,
                            width: frame.width(),
                            width_stride: 4,
                            height: frame.height(),
                            height_stride: frame.plane_stride()[0] as usize,
                        },
                        color_hint: Some(image::ColorType::Rgb8),
                    };

                    let scaled_img = image::imageops::thumbnail(
                        &img.as_view::<image::Rgb<u8>>().expect("could not create image view"),
                        target_width as u32,
                        target_height,
                    );
                    let thumb_num = current_thumb_num.lock().unwrap();
                    let thumbnail_save_path = std::path::PathBuf::from(
                        format!("/{}/thumbnail_{}.jpg", THUMBNAIL_PATH, *thumb_num)
                    );
                    scaled_img.save(&thumbnail_save_path).map_err(|err| {
                        element_error!(appsink, gst::ResourceError::Write,
                        (
                            "Failed to write a preview file {}: {}",
                            &thumbnail_save_path.display(), err
                        ));
                        gst::FlowError::Error
                    }).unwrap();
                    senders.send(0).unwrap();
                    Err(gst::FlowError::Eos)
                })
                .build()
        );

        Ok(pipeline)
    }

and my calling function. The barriers are only needed if you expect the function to return with all thumbnails generated.

fn thumbnail_thread(video_uri: String) {
        let uri = video_uri.clone();
        let barrier = Arc::new(Barrier::new(2));
        let barrier2 = barrier.clone();

        thread::spawn(move || {
            let got_current_thumb = Arc::new(Mutex::new(false));
            let current_thumb_num = Arc::new(Mutex::new(0));
            let (senders, receiver) = mpsc::channel();

            let pipeline = VideoPlayerModel::create_thumbnail_pipeline(Arc::clone(&got_current_thumb), Arc::clone(&current_thumb_num), uri, senders.clone())
                .expect("could not create thumbnail pipeline");
            pipeline.set_state(gst::State::Paused).unwrap();
            let bus = pipeline.bus().expect("Pipeline without a bus.");

            for msg in bus.iter_timed(ClockTime::NONE) {
                use gst::MessageView;

                match msg.view() {
                    MessageView::AsyncDone(..) => {
                        break;
                    }
                    _ => ()
                }
            }

            let duration = pipeline.query_duration::<ClockTime>().unwrap();
            let step = duration.mseconds() / (NUM_THUMBNAILS + 2); // + 2 so first and last frame not chosen

            for i in 0..NUM_THUMBNAILS {
                let timestamp = gst::GenericFormattedValue::from(ClockTime::from_mseconds(step + (step * i)));
                if pipeline.seek_simple(SeekFlags::FLUSH | SeekFlags::KEY_UNIT, timestamp).is_err()
                {
                    println!("Failed to seek");
                }
                pipeline.set_state(gst::State::Playing).unwrap();
                receiver.recv().unwrap();
                pipeline.set_state(gst::State::Paused).unwrap();

                let mut gen_new = got_current_thumb.lock().unwrap();
                let mut thumb_num = current_thumb_num.lock().unwrap();
                *thumb_num += 1;
                *gen_new = false;
            }
            barrier2.wait();
        });
        barrier.wait();
    }

If anyone knows of a way to get thumbnails faster that would still be appreciated. It takes roughly ~350ms for each thumbnail. It might be the pipeline but it seems simple enough.

Just for someone in the future. I ended up settling on the final code below. I found that saving the image took the longest time so I made that part of it parallel. Now it takes ~2.7 secs to generate 12 thumbnails, where ~2 seconds of that is waiting for the pipeline to initialize. For further improvements I might find a way to reuse the pipeline if the video changes.

fn launch_thumbnail_threads(video_uri: String, barrier: Arc<Barrier>) {
        let uri = video_uri.clone();

        // todo: figure way to return pipeline or use static pipeline to dispose of or null this pipeline
        thread::spawn(move || {
            let current_thumbnail_started: Arc<(Mutex<bool>, Condvar)> =
                Arc::new((Mutex::new(false), Condvar::new()));
            let num_started = Arc::new(Mutex::new(0));

            let pipeline = Self::create_thumbnail_pipeline(
                uri,
                barrier,
                Arc::clone(&current_thumbnail_started),
                Arc::clone(&num_started),
            )
                .expect("could not create thumbnail pipeline");

            pipeline.set_state(gst::State::Paused).unwrap();

            let pipe_clone = pipeline.clone();
            VideoPlayerModel::wait_for_playbin_done(&gst::Element::from(pipe_clone));

            let duration = pipeline.query_duration::<ClockTime>().unwrap();
            let step = duration.mseconds() / (NUM_THUMBNAILS + 2); // + 2 so first and last frame not chosen

            for i in 0..NUM_THUMBNAILS {
                let timestamp =
                    gst::GenericFormattedValue::from(ClockTime::from_mseconds(step + (step * i)));
                if pipeline
                    .seek_simple(SeekFlags::FLUSH | SeekFlags::KEY_UNIT, timestamp)
                    .is_err()
                {
                    println!("Failed to seek");
                }
                pipeline.set_state(gst::State::Playing).unwrap();
                let (lock, started_thumbnail) = &*current_thumbnail_started;
                let mut started = started_thumbnail
                    .wait_while(lock.lock().unwrap(), |pending| !*pending)
                    .unwrap();

                pipeline.set_state(gst::State::Paused).unwrap();
                *started = false;
            }
        });
    }
fn create_thumbnail_pipeline(
        video_uri: String,
        barrier: Arc<Barrier>,
        current_thumbnail_started: Arc<(Mutex<bool>, Condvar)>,
        num_started: Arc<Mutex<u64>>,
    ) -> Result<gst::Pipeline, Error> {
        let pipeline = gst::parse::launch(&format!(
            "uridecodebin uri={video_uri} ! videoconvert ! appsink name=sink"
        ))
            .unwrap()
            .downcast::<gst::Pipeline>()
            .expect("Expected a gst::pipeline");

        let appsink = pipeline
            .by_name("sink")
            .expect("sink element not found")
            .downcast::<gst_app::AppSink>()
            .expect("Sink element is expected to be appsink!");

        appsink.set_property("sync", false);

        appsink.set_caps(Some(
            &gst_video::VideoCapsBuilder::new()
                .format(gst_video::VideoFormat::Rgbx)
                .build(),
        ));

        appsink.set_callbacks(
            gst_app::AppSinkCallbacks::builder()
                .new_sample(move |appsink| {
                    Self::new_sample_callback(
                        appsink,
                        Arc::clone(&barrier),
                        Arc::clone(&current_thumbnail_started),
                        Arc::clone(&num_started),
                    )
                })
                .build(),
        );
        Ok(pipeline)
    }
fn new_sample_callback(
        appsink: &AppSink,
        barrier: Arc<Barrier>,
        current_thumbnail_started: Arc<(Mutex<bool>, Condvar)>,
        num_started: Arc<Mutex<u64>>,
    ) -> Result<gst::FlowSuccess, gst::FlowError> {
        let (lock, cvar) = &*Arc::clone(&current_thumbnail_started);
        let mut got_current = lock.lock().unwrap();

        if *got_current {
            return Err(gst::FlowError::Eos);
        }
        *got_current = true;

        let mut thumbnails_started = num_started.lock().unwrap();

        let curr_thumbnail = *thumbnails_started;
        let appsink = appsink.clone();

        thread::spawn(move || {
            let sample = appsink
                .pull_sample()
                .map_err(|_| gst::FlowError::Error)
                .unwrap();
            let buffer = sample
                .buffer()
                .ok_or_else(|| {
                    element_error!(appsink, gst::ResourceError::Failed, ("Failed"));
                    gst::FlowError::Error
                })
                .unwrap();

            let caps = sample.caps().expect("sample without caps");
            let info = gst_video::VideoInfo::from_caps(caps).expect("Failed to parse caps");

            let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, &info)
                .map_err(|_| {
                    element_error!(
                        appsink,
                        gst::ResourceError::Failed,
                        ("Failed to map buff readable")
                    );
                    gst::FlowError::Error
                })
                .unwrap();

            let aspect_ratio = (frame.width() as f64 * info.par().numer() as f64)
                / (frame.height() as f64 * info.par().denom() as f64);
            let target_height = THUMBNAIL_HEIGHT;
            let target_width = target_height as f64 * aspect_ratio;

            let img = image::FlatSamples::<&[u8]> {
                samples: frame.plane_data(0).unwrap(),
                layout: image::flat::SampleLayout {
                    channels: 3,
                    channel_stride: 1,
                    width: frame.width(),
                    width_stride: 4,
                    height: frame.height(),
                    height_stride: frame.plane_stride()[0] as usize,
                },
                color_hint: Some(image::ColorType::Rgb8),
            };

            let scaled_img = image::imageops::thumbnail(
                &img.as_view::<image::Rgb<u8>>()
                    .expect("could not create image view"),
                target_width as u32,
                target_height,
            );
            let thumbnail_save_path = std::path::PathBuf::from(format!(
                "/{}/thumbnail_{}.jpg",
                THUMBNAIL_PATH, curr_thumbnail
            ));

            scaled_img
                .save(&thumbnail_save_path)
                .map_err(|err| {
                    element_error!(
                        appsink,
                        gst::ResourceError::Write,
                        (
                            "Failed to write a preview file {}: {}",
                            &thumbnail_save_path.display(),
                            err
                        )
                    );
                    gst::FlowError::Error
                })
                .unwrap();

            barrier.wait();
        });

        *thumbnails_started += 1;
        cvar.notify_one();
        Err(gst::FlowError::Eos)
    }