Hi there. I’m quite new to GStreamer and currently develop my own app based on Python.
My scenario is to read multiple consecutive .mkv files, put them through my OpenCV based functions frame by frame and produce then hls output.
When I designed a pipeline like the following way, it was working well:
filesrc → matroskademux → queue (video queue) → decodebin → x264enc → mpegtsmux → queue (audio queue) → decodebin → audioconvert → audiosample → avenc_aac → mpegtsmux → hlssink.
As I need to handle multiple files in a frame level with OpenCV, I decided to introduce appsink and appsrc. However, after a lot of different attempts by spending tons of hours, I cannot make the appsink emits the new-sample properly. I’ve applied almost all codes available online, but really frustrated that I cannot make it work.
I removed the appsrc part because my first goal for now is to make the new-sample callback functions from both video queue and audio queue triggered.
Could someone please help?
My local environment is MacOS (M1) and the input mkv file is a very normal and standard file containing only video and audio stream with 3072x1728 resolution 30fps with h264 codec, juvj420p pixel format.
[...]
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
class FrameProcessor:
def __init__(self):
self.input_file_queue = Queue()
self._input_file_downloading = True
self._input_file = None
self.input_dir = None
self.input_filename = None
self._segment_length = None
self.video_meta = None
self.segment_count = 0
self.segment_prefix = 'playlist'
# Gstreamer initialisation
self.pipeline = None
self.loop = None
self.video_src = None
self.demuxer = None
self.video_queue = None
self.video_decoder = None
self.video_converter = None
self.video_caps_filter = None
self.video_encoder = None
self.audio_queue = None
self.audio_parser = None
self.audio_decoder = None
self.audio_convert = None
self.audio_resampler = None
self.audio_encoder = None
self.muxer = None
self.sink = None
@property
def segment_length(self):
return self._segment_length
@segment_length.setter
def segment_length(self, value):
self._segment_length = value
def enqueue_input_file(self, new_file):
self.input_file_queue.put(new_file)
def dequeue_input_file(self):
if self.input_file_queue:
return self.input_file_queue.get()
else:
return None
@property
def input_file(self):
return self._input_file
@input_file.setter
def input_file(self, new_file):
logging.info(f'[Frame Processor] New input file detected {new_file}')
self._input_file = new_file
self.input_dir = os.path.dirname(new_file)
self.input_filename = os.path.basename(new_file)
self.video_meta = VideoMeta(new_file)
@property
def input_file_downloading(self):
return self._input_file_downloading
@input_file_downloading.setter
def input_file_downloading(self, status:bool):
self._input_file_downloading = status
def initialize_output_container(self):
first_file = self.dequeue_input_file()
logging.info(f'[Video] Initialising the first file {first_file}')
self.input_file = first_file
self.pipeline = Gst.Pipeline.new('pipeline')
self.create_elements()
self.link_elements()
def create_elements(self):
self.video_src = Gst.ElementFactory.make("filesrc", "file-source")
self.video_src.set_property('location', self.input_file)
logging.info(f'[Video] Creating elements based on {self.input_file}')
self.demuxer = Gst.ElementFactory.make("matroskademux", "demuxer")
self.video_queue = Gst.ElementFactory.make('queue', 'video_queue')
self.video_queue.set_property("max-size-time", 5000000000) # 5 seconds
self.audio_queue = Gst.ElementFactory.make('queue', 'audio_queue')
self.audio_queue.set_property("max-size-time", 5000000000) # 5 seconds
self.video_decoder = Gst.ElementFactory.make('decodebin', 'decodebin_video')
self.video_converter = Gst.ElementFactory.make('videoconvert', 'video_converter') #nvvidconv for nvidia environment
self.video_caps_filter = Gst.ElementFactory.make("capsfilter", "caps_filter")
caps = Gst.caps_from_string(f"video/x-raw, format=(string)NV12, width=(int){self.video_meta.resolution['width']}, height=(int){self.video_meta.resolution['height']}")
self.video_caps_filter.set_property("caps", caps)
self.video_post_dec_queue = Gst.ElementFactory.make('queue', 'video_post_dec_queue')
self.video_post_dec_queue.set_property("max-size-time", 5000000000) # 5 seconds
self.audio_parser = Gst.ElementFactory.make("aacparse", "aac_parser")
self.audio_decoder = Gst.ElementFactory.make('decodebin', 'decodebin_audio')
self.audio_post_dec_queue = Gst.ElementFactory.make('queue', 'audio_post_dec_queue')
self.audio_post_dec_queue.set_property("max-size-buffers", 500) # 5 seconds
self.audio_post_dec_queue.set_property("max-size-time", 8000000000) # 5 seconds
self.video_appsink = Gst.ElementFactory.make('appsink', 'video_appsink')
self.video_appsink.set_property("emit-signals", True)
self.audio_appsink = Gst.ElementFactory.make('appsink', 'audio_appsink')
self.audio_appsink.set_property("emit-signals", True)
self.video_encoder = Gst.ElementFactory.make('x264enc', 'video_encoder')
self.muxer = Gst.ElementFactory.make('mpegtsmux', 'muxer')
self.sink = Gst.ElementFactory.make('hlssink', 'sink')
self.sink.set_property("playlist-root", self.input_dir)
self.sink.set_property("location", f"tmp/{self.segment_prefix}%05d.ts")
self.sink.set_property("playlist-location", f"tmp/{self.segment_prefix}.m3u8")
self.sink.set_property("target-duration", self.segment_length)
self.sink.set_property("max-files", 0)
self.sink.set_property("playlist-length", 0)
self.audio_converter = Gst.ElementFactory.make("audioconvert", "audio_converter")
self.audio_resampler = Gst.ElementFactory.make("audioresample", "audio_resampler")
self.audio_encoder = Gst.ElementFactory.make("avenc_aac", "audio_encoder")
elements = [
self.video_src,
self.demuxer,
self.video_queue,
self.audio_queue,
self.video_decoder,
self.video_converter,
self.video_caps_filter,
self.video_post_dec_queue,
self.video_appsink,
self.audio_parser,
self.audio_decoder,
self.audio_post_dec_queue,
self.audio_appsink,
self.video_encoder,
self.muxer,
self.sink,
self.audio_converter,
self.audio_resampler,
self.audio_encoder
]
if not all(elements):
logging.error('[Video] Failed to create all elements')
raise ValueError('Failed to create all elements')
for element in elements:
self.pipeline.add(element)
logging.info(f'[Video] All elements are created and added to the pipeline')
def link_elements(self):
self.video_src.link(self.demuxer)
self.demuxer.connect('pad-added', self.on_pad_added, self.video_queue)
self.video_queue.link(self.video_decoder)
self.video_decoder.connect('pad-added', self.on_pad_added, self.video_converter)
self.video_converter.link(self.video_caps_filter)
self.video_caps_filter.link(self.video_post_dec_queue)
self.video_post_dec_queue.link(self.video_appsink)
self.video_appsink.connect("new-sample", self.on_new_video_sample)
self.video_encoder.link(self.muxer)
self.muxer.link(self.sink)
self.demuxer.connect('pad-added', self.on_pad_added, self.audio_queue)
self.audio_queue.link(self.audio_parser)
self.audio_parser.link(self.audio_decoder)
self.audio_decoder.connect('pad-added', self.on_pad_added, self.audio_post_dec_queue)
self.audio_post_dec_queue.link(self.audio_appsink)
self.audio_appsink.connect("new-sample", self.on_new_audio_sample)
self.audio_converter.link(self.audio_resampler)
self.audio_resampler.link(self.audio_encoder)
self.audio_encoder.link(self.muxer)
logging.info(f'[Video] All elements are linked')
def on_pad_added(self, src_element, new_pad, target_element):
new_pad_caps = new_pad.get_current_caps()
new_pad_struct = new_pad_caps.get_structure(0)
new_pad_name = new_pad_struct.get_name()
src_element_name = src_element.get_name()
target_element_name = target_element.get_name()
if src_element_name == 'demuxer':
if target_element_name == 'video_queue' and new_pad_name.startswith('video'):
sink_pad = target_element.get_static_pad('sink')
elif target_element_name == 'audio_queue' and new_pad_name.startswith('audio'):
sink_pad = target_element.get_static_pad('sink')
else:
logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
return
elif src_element_name == 'decodebin_video':
if target_element_name == 'video_converter':
sink_pad = target_element.get_static_pad('sink')
else:
logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
return
elif src_element_name == 'decodebin_audio':
if target_element_name == 'audio_post_dec_queue':
sink_pad = target_element.get_static_pad('sink')
else:
logging.info(f'[Dynamic Pad] Skipping element: {src_element_name}, pad name: {new_pad_name}, target_element: {target_element_name}')
return
if sink_pad.is_linked():
logging.info(f'[Dynamic Pad] Already linked: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')
return
link_result = new_pad.link(sink_pad)
if link_result == Gst.PadLinkReturn.OK:
logging.info(f'[Dynamic Pad] Adding pad: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')
else:
logging.error(f'[Dynamic Pad] Failed to add pad: {new_pad_name}, element: {src_element_name}, target_element: {target_element_name}')
def on_new_video_sample(self, appsink):
print('======== called new video sample ==========')
sample = appsink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
print(f"New video sample: Size={buffer.get_size()} bytes")
sample.unref()
return Gst.FlowReturn.OK
else:
print("No new video sample.")
return Gst.FlowReturn.ERROR
def on_new_audio_sample(self, appsink):
print('======== called new audio sample ==========')
sample = appsink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
print(f"New audio sample: Size={buffer.get_size()} bytes")
sample.unref()
return Gst.FlowReturn.OK
else:
print("No new audio sample.")
return Gst.FlowReturn.ERROR
def on_bus_message(self, bus, msg):
message_type = msg.type
if message_type == Gst.MessageType.ERROR:
err, debug_info = msg.parse_error()
logging.error(f"[GstStatus] Error received from element {msg.src.get_name()}: {err.message}")
logging.error(f"[GstStatus] Debugging information: {debug_info or 'none'}")
self.pipeline.set_state(Gst.State.READY)
self.loop.quit()
elif message_type == Gst.MessageType.WARNING:
m = msg.parse_warning()
logging.warning(f'[GstStatus] Warning {m}')
elif message_type == Gst.MessageType.INFO:
m = msg.parse_info()
logging.info(f'[GstStatus] Info {m}')
elif message_type == Gst.MessageType.EOS:
logging.info("[Video] End-Of-Stream reached.")
self.pipeline.set_state(Gst.State.READY)
self.input_file_queue.task_done()
while True:
if self.input_file_queue.qsize():
next_file = self.dequeue_input_file()
self.input_file = next_file
logging.info(f'[Video] Processing the next file {next_file}')
self.video_src.set_property('location', next_file)
self.pipeline.set_state(Gst.State.PLAYING)
break
else:
if self.input_file_downloading :
file_download_waiting = 30 # seconds
logging.warning(f'Files are still downloading but no new file in the queue yet. Try again after {file_download_waiting} seconds. Waiting...')
time.sleep(file_download_waiting)
else:
logging.info('[Video] All files in the queue have been processed')
self.pipeline.set_state(Gst.State.READY)
self.loop.quit()
break
elif message_type == Gst.MessageType.BUFFERING:
percent = msg.parse_buffering_percent()[1]
print(f'Buffering... percent {percent}')
if not self.is_live:
print(f"Buffering ({percent}%)")
if percent < 100:
self.pipeline.set_state(Gst.State.PAUSED)
else:
self.pipeline.set_state(Gst.State.PLAYING)
elif message_type == Gst.MessageType.CLOCK_LOST:
self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.set_state(Gst.State.PLAYING)
def exec_pipeline(self):
logging.info(f'[Video] Executing GStreamer')
bus = self.pipeline.get_bus()
self.pipeline.set_state(Gst.State.PAUSED)
time.sleep(1)
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
logging.error('[Video] Unable to set the pipeline to the playing state.')
elif ret == Gst.StateChangeReturn.NO_PREROLL:
self.is_live = True
self.loop = GObject.MainLoop()
bus.add_signal_watch()
bus.connect('message', self.on_bus_message)
self.loop.run()
self.pipeline.set_state(Gst.State.NULL)
Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline_graph")
Here’s the last part of the debug logs:
0:00:09.027687000 55609 0x600003682350 LOG queue_dataflow gstqueue.c:1207:gst_queue_chain_buffer_or_list:<audio_post_dec_queue> received buffer 0x7faea9f6fa30 of size 4096, time 0:00:05.151000000, duration 0:00:00.021333333
0:00:09.027692000 55609 0x600003682350 LOG queue gstqueue.c:639:apply_buffer:<audio_post_dec_queue> sink position updated to 0:00:05.172333333
0:00:09.027695000 55609 0x600003682350 LOG queue gstqueue.c:534:update_time_level:<audio_post_dec_queue> update sink time
0:00:09.027700000 55609 0x600003682350 LOG queue gstqueue.c:552:update_time_level:<audio_post_dec_queue> sink +0:00:05.172333333, src +0:00:00.035333333
0:00:09.027904000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<audio_post_dec_queue:sink> called chainfunction &gst_queue_chain with buffer 0x7faea9f6fa30, returned ok
0:00:09.027908000 55609 0x600003682350 LOG GST_SCHEDULING gstpad.c:3846:do_probe_callbacks:<decodebin_audio:src_0> do probes
0:00:09.027912000 55609 0x600003682350 LOG GST_SCHEDULING gstpad.c:3730:probe_hook_marshal:<decodebin_audio:src_0> hook 1 with flags 0x00003040 does not match 00001001
0:00:09.027916000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<src_0:proxypad3> called chainfunction &gst_proxy_pad_chain_default with buffer 0x7faea9f6fa30, returned ok
0:00:09.028092000 55609 0x600003682350 LOG audiodecoder gstaudiodecoder.c:1121:gst_audio_decoder_output:<avdec_aac0> buffer pushed: ok
0:00:09.028099000 55609 0x600003682350 DEBUG libav gstavauddec.c:548:gst_ffmpegauddec_audio_frame:<avdec_aac0> Need more data
0:00:09.028103000 55609 0x600003682350 DEBUG libav gstavauddec.c:562:gst_ffmpegauddec_audio_frame:<avdec_aac0> return flow ok, out 0x0, got_frame 0
0:00:09.028106000 55609 0x600003682350 DEBUG libav gstavauddec.c:595:gst_ffmpegauddec_frame:<avdec_aac0> We didn't get a decoded buffer
0:00:09.028110000 55609 0x600003682350 LOG audiodecoder gstaudiodecoder.c:1376:gst_audio_decoder_finish_frame_or_subframe:<avdec_aac0> accepting 0 bytes == 0 samples for 1 frames
0:00:09.028114000 55609 0x600003682350 DEBUG audiodecoder gstaudiodecoder.c:1432:gst_audio_decoder_finish_frame_or_subframe:<avdec_aac0> leading frame ts 0:00:05.151000000
0:00:09.028295000 55609 0x600003682350 LOG GST_BUFFER gstbuffer.c:790:_gst_buffer_free: finalize 0x7fae69f272a0
0:00:09.028299000 55609 0x600003682350 DEBUG GST_MEMORY gstmemory.c:89:_gst_memory_free: free memory 0x600002787cc0
0:00:09.028307000 55609 0x600003682350 LOG audiodecoder gstaudiodecoder.c:1751:gst_audio_decoder_push_buffers:<avdec_aac0> done pushing to subclass
0:00:09.028309000 55609 0x600003682350 LOG audiodecoder gstaudiodecoder.c:1856:gst_audio_decoder_chain_forward:<avdec_aac0> chain-done
0:00:09.028314000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<avdec_aac0:sink> called chainfunction &gst_audio_decoder_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028524000 55609 0x600003682350 LOG baseparse gstbaseparse.c:2608:gst_base_parse_push_frame:<aacparse0> frame pushed, flow ok
0:00:09.028530000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<aacparse0:sink> called chainfunction &gst_base_parse_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028534000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<typefind:sink> called chainfunction &gst_type_find_element_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028538000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<decodebin_audio:sink> called chainfunction &gst_proxy_pad_chain_default with buffer 0x7fae69f272a0, returned ok
0:00:09.028755000 55609 0x600003682350 LOG baseparse gstbaseparse.c:2608:gst_base_parse_push_frame:<aac_parser> frame pushed, flow ok
0:00:09.028760000 55609 0x600003682350 DEBUG GST_SCHEDULING gstpad.c:4467:gst_pad_chain_data_unchecked:<aac_parser:sink> called chainfunction &gst_base_parse_chain with buffer 0x7fae69f272a0, returned ok
0:00:09.028764000 55609 0x600003682350 DEBUG queue_dataflow gstqueue.c:1520:gst_queue_loop:<audio_queue> queue is empty
0:00:09.028774000 55609 0x600003682350 LOG queue_dataflow gstqueue.c:1529:gst_queue_loop:<audio_queue> (audio_queue:src) wait for ADD: 0 of 0-200 buffers, 0 of 0-10485760 bytes, 0 of 0-5000000000 ns, 0 items