Appsink's new-sample callback function is never triggered as the data flow is stuck

Hi there. I’m quite new to GStreamer and currently develop my own app based on Python.

My scenario is to read multiple consecutive .mkv files, put them through my OpenCV based functions frame by frame and produce then hls output.

When I designed a pipeline like the following way, it was working well:
filesrc → matroskademux → queue (video queue) → decodebin → x264enc → mpegtsmux → queue (audio queue) → decodebin → audioconvert → audiosample → avenc_aac → mpegtsmux → hlssink.

As I need to handle multiple files in a frame level with OpenCV, I decided to introduce appsink and appsrc. However, after a lot of different attempts by spending tons of hours, I cannot make the appsink emits the new-sample properly. I’ve applied almost all codes available online, but really frustrated that I cannot make it work.

I removed the appsrc part because my first goal for now is to make the new-sample callback functions from both video queue and audio queue triggered.

Could someone please help?
My local environment is MacOS (M1) and the input mkv file is a very normal and standard file containing only video and audio stream with 3072x1728 resolution 30fps with h264 codec, juvj420p pixel format.

[...]
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject

class FrameProcessor:
    def __init__(self):
        self.input_file_queue = Queue()
        self._input_file_downloading = True
        self._input_file = None
        self.input_dir = None
        self.input_filename = None

        self._segment_length = None
        self.video_meta = None

        self.segment_count = 0
        self.segment_prefix = 'playlist'

        # Gstreamer initialisation
        self.pipeline = None
        self.loop = None
        self.video_src = None
        self.demuxer = None
        self.video_queue = None
        self.video_decoder = None
        self.video_converter = None
        self.video_caps_filter = None
        self.video_encoder = None

        self.audio_queue = None
        self.audio_parser = None
        self.audio_decoder = None
        self.audio_convert = None
        self.audio_resampler = None
        self.audio_encoder = None

        self.muxer = None
        self.sink = None

    @property
    def segment_length(self):
        return self._segment_length

    @segment_length.setter
    def segment_length(self, value):
        self._segment_length = value

    def enqueue_input_file(self, new_file):
        self.input_file_queue.put(new_file)

    def dequeue_input_file(self):
        if self.input_file_queue:
            return self.input_file_queue.get()
        else:
            return None

    @property
    def input_file(self):
        return self._input_file

    @input_file.setter
    def input_file(self, new_file):
        logging.info(f'[Frame Processor] New input file detected {new_file}')
        self._input_file = new_file
        self.input_dir = os.path.dirname(new_file)
        self.input_filename = os.path.basename(new_file)
        self.video_meta = VideoMeta(new_file)

    @property
    def input_file_downloading(self):
        return self._input_file_downloading

    @input_file_downloading.setter
    def input_file_downloading(self, status:bool):
        self._input_file_downloading = status

    def initialize_output_container(self):
        first_file = self.dequeue_input_file()
        logging.info(f'[Video] Initialising the first file {first_file}')
        self.input_file = first_file

        self.pipeline = Gst.Pipeline.new('pipeline')
        self.create_elements()
        self.link_elements()

    def create_elements(self):
        self.video_src = Gst.ElementFactory.make("filesrc", "file-source")
        self.video_src.set_property('location', self.input_file)
        logging.info(f'[Video] Creating elements based on {self.input_file}')

        self.demuxer = Gst.ElementFactory.make("matroskademux", "demuxer")

        self.video_queue = Gst.ElementFactory.make('queue', 'video_queue')
        self.video_queue.set_property("max-size-time", 5000000000)  # 5 seconds
        self.audio_queue = Gst.ElementFactory.make('queue', 'audio_queue')
        self.audio_queue.set_property("max-size-time", 5000000000)  # 5 seconds

        self.video_decoder = Gst.ElementFactory.make('decodebin', 'decodebin_video')
        self.video_converter = Gst.ElementFactory.make('videoconvert', 'video_converter') #nvvidconv for nvidia environment
        self.video_caps_filter = Gst.ElementFactory.make("capsfilter", "caps_filter")
        caps = Gst.caps_from_string(f"video/x-raw, format=(string)NV12, width=(int){self.video_meta.resolution['width']}, height=(int){self.video_meta.resolution['height']}")
        self.video_caps_filter.set_property("caps", caps)
        self.video_post_dec_queue = Gst.ElementFactory.make('queue', 'video_post_dec_queue')
        self.video_post_dec_queue.set_property("max-size-time", 5000000000)  # 5 seconds

        self.audio_parser = Gst.ElementFactory.make("aacparse", "aac_parser")
        self.audio_decoder = Gst.ElementFactory.make('decodebin', 'decodebin_audio')
        self.audio_post_dec_queue = Gst.ElementFactory.make('queue', 'audio_post_dec_queue')
        self.audio_post_dec_queue.set_property("max-size-buffers", 500)  # 5 seconds
        self.audio_post_dec_queue.set_property("max-size-time", 8000000000)  # 5 seconds

        self.video_appsink = Gst.ElementFactory.make('appsink', 'video_appsink')
        self.video_appsink.set_property("emit-signals", True)
        self.audio_appsink = Gst.ElementFactory.make('appsink', 'audio_appsink')
        self.audio_appsink.set_property("emit-signals", True)

        self.video_encoder = Gst.ElementFactory.make('x264enc', 'video_encoder')
        self.muxer = Gst.ElementFactory.make('mpegtsmux', 'muxer')

        self.sink = Gst.ElementFactory.make('hlssink', 'sink')
        self.sink.set_property("playlist-root", self.input_dir)
        self.sink.set_property("location", f"tmp/{self.segment_prefix}%05d.ts")
        self.sink.set_property("playlist-location", f"tmp/{self.segment_prefix}.m3u8")
        self.sink.set_property("target-duration", self.segment_length)
        self.sink.set_property("max-files", 0)
        self.sink.set_property("playlist-length", 0)

        self.audio_converter = Gst.ElementFactory.make("audioconvert", "audio_converter")
        self.audio_resampler = Gst.ElementFactory.make("audioresample", "audio_resampler")
        self.audio_encoder = Gst.ElementFactory.make("avenc_aac", "audio_encoder")

        elements = [
            self.video_src,
            self.demuxer,
            self.video_queue,
            self.audio_queue,
            self.video_decoder,
            self.video_converter,
            self.video_caps_filter,
            self.video_post_dec_queue,
            self.video_appsink,
            self.audio_parser,
            self.audio_decoder,
            self.audio_post_dec_queue,
            self.audio_appsink,
            self.video_encoder,
            self.muxer,
            self.sink,
            self.audio_converter,
            self.audio_resampler,
            self.audio_encoder
        ]
        if not all(elements):
            logging.error('[Video] Failed to create all elements')
            raise ValueError('Failed to create all elements')
        for element in elements:
            self.pipeline.add(element)

        logging.info(f'[Video] All elements are created and added to the pipeline')

    def link_elements(self):
        self.video_src.link(self.demuxer)
        self.demuxer.connect('pad-added', self.on_pad_added, self.video_queue)

        self.video_queue.link(self.video_decoder)
        self.video_decoder.connect('pad-added', self.on_pad_added, self.video_converter)
        self.video_converter.link(self.video_caps_filter)
        self.video_caps_filter.link(self.video_post_dec_queue)
        self.video_post_dec_queue.link(self.video_appsink)
        self.video_appsink.connect("new-sample", self.on_new_video_sample)

        self.video_encoder.link(self.muxer)
        self.muxer.link(self.sink)

        self.demuxer.connect('pad-added', self.on_pad_added, self.audio_queue)
        self.audio_queue.link(self.audio_parser)
        self.audio_parser.link(self.audio_decoder)
        self.audio_decoder.connect('pad-added', self.on_pad_added, self.audio_post_dec_queue)
        self.audio_post_dec_queue.link(self.audio_appsink)
        self.audio_appsink.connect("new-sample", self.on_new_audio_sample)

        self.audio_converter.link(self.audio_resampler)
        self.audio_resampler.link(self.audio_encoder)
        self.audio_encoder.link(self.muxer)
        logging.info(f'[Video] All elements are linked')

    def on_pad_added(self, src_element, new_pad, target_element):
        new_pad_caps = new_pad.get_current_caps()
        new_pad_struct = new_pad_caps.get_structure(0)
        new_pad_name = new_pad_struct.get_name()
        src_element_name = src_element.get_name()
        target_element_name = target_element.get_name()

        if src_element_name == 'demuxer':
            if target_element_name == 'video_queue' and new_pad_name.startswith('video'):
                sink_pad = target_element.get_static_pad('sink')
            elif target_element_name == 'audio_queue' and new_pad_name.startswith('audio'):
                sink_pad = target_element.get_static_pad('sink')
            else:
                logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
                return
        elif src_element_name == 'decodebin_video':
            if target_element_name == 'video_converter':
                sink_pad = target_element.get_static_pad('sink')
            else:
                logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
                return
        elif src_element_name == 'decodebin_audio':
            if target_element_name == 'audio_post_dec_queue':
                sink_pad = target_element.get_static_pad('sink')
            else:
                logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
                return

        if sink_pad.is_linked():
            logging.info(f'[Dynamic Pad] Already linked: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')
            return

        link_result = new_pad.link(sink_pad)
        if link_result == Gst.PadLinkReturn.OK:
            logging.info(f'[Dynamic Pad] Adding pad: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')
        else:
            logging.error(f'[Dynamic Pad] Failed to add pad: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')

    def on_new_video_sample(self, appsink):
        print('======== called new video sample ==========')
        sample = appsink.emit("pull-sample")
        if sample:
            buffer = sample.get_buffer()
            print(f"New video sample: Size={buffer.get_size()} bytes")
            sample.unref()
            return Gst.FlowReturn.OK
        else:
            print("No new video sample.")
            return Gst.FlowReturn.ERROR

    def on_new_audio_sample(self, appsink):
        print('======== called new audio sample ==========')
        sample = appsink.emit("pull-sample")
        if sample:
            buffer = sample.get_buffer()
            print(f"New audio sample: Size={buffer.get_size()} bytes")
            sample.unref()
            return Gst.FlowReturn.OK
        else:
            print("No new audio sample.")
            return Gst.FlowReturn.ERROR

    def on_bus_message(self, bus, msg):
        message_type = msg.type

        if message_type == Gst.MessageType.ERROR:
            err, debug_info = msg.parse_error()
            logging.error(f"[GstStatus] Error received from element {msg.src.get_name()}: {err.message}")
            logging.error(f"[GstStatus] Debugging information: {debug_info or 'none'}")
            self.pipeline.set_state(Gst.State.READY)
            self.loop.quit()

        elif message_type == Gst.MessageType.WARNING:
            m = msg.parse_warning()
            logging.warning(f'[GstStatus] Warning {m}')

        elif message_type == Gst.MessageType.INFO:
            m = msg.parse_info()
            logging.info(f'[GstStatus] Info {m}')

        elif message_type == Gst.MessageType.EOS:
            logging.info("[Video] End-Of-Stream reached.")
            self.pipeline.set_state(Gst.State.READY)
            self.input_file_queue.task_done()
            while True:
                if self.input_file_queue.qsize():
                    next_file = self.dequeue_input_file()
                    self.input_file = next_file
                    logging.info(f'[Video] Processing the next file {next_file}')
                    self.video_src.set_property('location', next_file)
                    self.pipeline.set_state(Gst.State.PLAYING)
                    break
                else:
                    if self.input_file_downloading :
                        file_download_waiting = 30 # seconds
                        logging.warning(f'Files are still downloading but no new file in the queue yet. Try again after {file_download_waiting} seconds. Waiting...')
                        time.sleep(file_download_waiting)
                    else:
                        logging.info('[Video] All files in the queue have been processed')
                        self.pipeline.set_state(Gst.State.READY)
                        self.loop.quit()
                        break

        elif message_type == Gst.MessageType.BUFFERING:
            percent = msg.parse_buffering_percent()[1]
            print(f'Buffering... percent {percent}')
            if not self.is_live:
                print(f"Buffering ({percent}%)")
                if percent < 100:
                    self.pipeline.set_state(Gst.State.PAUSED)
                else:
                    self.pipeline.set_state(Gst.State.PLAYING)

        elif message_type == Gst.MessageType.CLOCK_LOST:
            self.pipeline.set_state(Gst.State.PAUSED)
            self.pipeline.set_state(Gst.State.PLAYING)

    def exec_pipeline(self):
        logging.info(f'[Video] Executing GStreamer')
        bus = self.pipeline.get_bus()

        self.pipeline.set_state(Gst.State.PAUSED)
        time.sleep(1)

        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            logging.error('[Video] Unable to set the pipeline to the playing state.')
        elif ret == Gst.StateChangeReturn.NO_PREROLL:
            self.is_live = True

        self.loop = GObject.MainLoop()
        bus.add_signal_watch()
        bus.connect('message', self.on_bus_message)

        self.loop.run()
        self.pipeline.set_state(Gst.State.NULL)

        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline_graph")

Here’s the last part of the debug logs:

0:00:09.027687000 55609 0x600003682350 LOG           queue_dataflow gstqueue.c:1207:gst_queue_chain_buffer_or_list:<audio_post_dec_queue> received buffer 0x7faea9f6fa30 of size 4096, time 0:00:05.151000000, duration 0:00:00.021333333
0:00:09.027692000 55609 0x600003682350 LOG                    queue gstqueue.c:639:apply_buffer:<audio_post_dec_queue> sink position updated to 0:00:05.172333333
0:00:09.027695000 55609 0x600003682350 LOG                    queue gstqueue.c:534:update_time_level:<audio_post_dec_queue> update sink time
0:00:09.027700000 55609 0x600003682350 LOG                    queue gstqueue.c:552:update_time_level:<audio_post_dec_queue> sink +0:00:05.172333333, src +0:00:00.035333333
0:00:09.027904000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<audio_post_dec_queue:sink> called chainfunction &gst_queue_chain with buffer 0x7faea9f6fa30, returned ok
0:00:09.027908000 55609 0x600003682350 LOG           GST_SCHEDULING gstpad.c:3846:do_probe_callbacks:<decodebin_audio:src_0> do probes
0:00:09.027912000 55609 0x600003682350 LOG           GST_SCHEDULING gstpad.c:3730:probe_hook_marshal:<decodebin_audio:src_0> hook 1 with flags 0x00003040 does not match 00001001
0:00:09.027916000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<src_0:proxypad3> called chainfunction &gst_proxy_pad_chain_default with buffer 0x7faea9f6fa30, returned ok
0:00:09.028092000 55609 0x600003682350 LOG             audiodecoder gstaudiodecoder.c:1121:gst_audio_decoder_output:<avdec_aac0> buffer pushed: ok
0:00:09.028099000 55609 0x600003682350 DEBUG                  libav gstavauddec.c:548:gst_ffmpegauddec_audio_frame:<avdec_aac0> Need more data
0:00:09.028103000 55609 0x600003682350 DEBUG                  libav gstavauddec.c:562:gst_ffmpegauddec_audio_frame:<avdec_aac0> return flow ok, out 0x0, got_frame 0
0:00:09.028106000 55609 0x600003682350 DEBUG                  libav gstavauddec.c:595:gst_ffmpegauddec_frame:<avdec_aac0> We didn't get a decoded buffer
0:00:09.028110000 55609 0x600003682350 LOG             audiodecoder gstaudiodecoder.c:1376:gst_audio_decoder_finish_frame_or_subframe:<avdec_aac0> accepting 0 bytes == 0 samples for 1 frames
0:00:09.028114000 55609 0x600003682350 DEBUG           audiodecoder gstaudiodecoder.c:1432:gst_audio_decoder_finish_frame_or_subframe:<avdec_aac0> leading frame ts 0:00:05.151000000
0:00:09.028295000 55609 0x600003682350 LOG               GST_BUFFER gstbuffer.c:790:_gst_buffer_free: finalize 0x7fae69f272a0
0:00:09.028299000 55609 0x600003682350 DEBUG             GST_MEMORY gstmemory.c:89:_gst_memory_free: free memory 0x600002787cc0
0:00:09.028307000 55609 0x600003682350 LOG             audiodecoder gstaudiodecoder.c:1751:gst_audio_decoder_push_buffers:<avdec_aac0> done pushing to subclass
0:00:09.028309000 55609 0x600003682350 LOG             audiodecoder gstaudiodecoder.c:1856:gst_audio_decoder_chain_forward:<avdec_aac0> chain-done
0:00:09.028314000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<avdec_aac0:sink> called chainfunction &gst_audio_decoder_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028524000 55609 0x600003682350 LOG                baseparse gstbaseparse.c:2608:gst_base_parse_push_frame:<aacparse0> frame pushed, flow ok
0:00:09.028530000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<aacparse0:sink> called chainfunction &gst_base_parse_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028534000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<typefind:sink> called chainfunction &gst_type_find_element_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028538000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<decodebin_audio:sink> called chainfunction &gst_proxy_pad_chain_default with buffer 0x7fae69f272a0, returned ok
0:00:09.028755000 55609 0x600003682350 LOG                baseparse gstbaseparse.c:2608:gst_base_parse_push_frame:<aac_parser> frame pushed, flow ok
0:00:09.028760000 55609 0x600003682350 DEBUG         GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<aac_parser:sink> called chainfunction &gst_base_parse_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028764000 55609 0x600003682350 DEBUG         queue_dataflow gstqueue.c:1520:gst_queue_loop:<audio_queue> queue is empty
0:00:09.028774000 55609 0x600003682350 LOG           queue_dataflow gstqueue.c:1529:gst_queue_loop:<audio_queue> (audio_queue:src) wait for ADD: 0 of 0-200 buffers, 0 of 0-10485760 bytes, 0 of 0-5000000000 ns, 0 items

It might be helpful to create and post a pipeline dot file dump / graph to see what the pipeline looks like and what the element states are.

See Basic tutorial 11: Debugging tools (you could do that whenever you get state change messages for the top-level pipeline, or in a timeout or somesuch). You’d have to find the python function that triggers the dumping, there’s API on GstBin IIRC.

To simplify things, I would propose you try something like this:

  • create a playbin3 or playbin element
  • set the "uri" property to the file you want to play, e.g. file:///path/to/foo.mkv (slightly different on Windows)
  • create an appsink for the audio:
    • set some kind of caps on the caps property, audio/x-raw should be enough I think.
    • set the emit-signals property to True
    • connect to the new-sample signal
    • set that appsink element on playbin via the audio-sink property.
  • create an appsink for the video
    • same as above, just with video/x-raw caps (can also have a format=RGB or so if that’s what you need) and set as video-sink on playbin.
  • set playbin3/playbin to PLAYING state