Issue
I am working on a GStreamer-based recording pipeline on an ARM64 platform. My goal is to record 10-second video clips, where the first 5 seconds are continuously buffered in a circular queue, and the remaining 5 seconds are captured with the probes unblocked before being written to a file. To achieve this, I use pad probes to pause both the video and audio streams before multiplexing. I found this snippet, which describes how to block the video stream, but I am unsure if I am handling audio and video synchronization correctly.Expected Behavior
- The recorded video clips should have perfectly synchronized video and audio.
- Both the video and audio streams should pause and resume simultaneously when the probes are blocked and unblocked.
Observed Behavior
- Video behaves as expected: frames are recorded correctly.
- Audio is completely desynchronized and appears to continue playing, even when the probe is supposed to block it.
- The resulting files contain video with correct timing, but the audio does not align properly.
Setup
- Device: [Raspberry pi 5]
- GStreamer Version: [1.24.11]
Code
Here is the Python script I am using:import gi
import time
import threading
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib
class RecordApp:
def __init__(self):
Gst.init(None)
self.chunk_count = 0
self.buffer_count_video = 0
self.buffer_count_audio = 0
self.pipeline = Gst.parse_launch(
"audiotestsrc is-live=true ! audioconvert ! tee name=atee "
"atee. ! queue ! fakesink "
"atee. ! queue name=arecq ! voaacenc name=voaacenc ! aacparse name=aacparse ! mux. "
"videotestsrc is-live=true ! video/x-raw,width=1920,height=1080,format=I420 ! "
"timeoverlay ! x264enc key-int-max=1 tune=zerolatency bitrate=8000 ! "
"queue name=vrecq ! mp4mux name=mux ! "
"filesink location=video.mp4 name=awss3sink"
)
self.pipeline.set_property("message-forward", True)
self.vrecq = self.pipeline.get_by_name("vrecq")
self.vrecq.set_property("max-size-time", 5*Gst.SECOND)
self.vrecq.set_property("max-size-bytes", 0)
self.vrecq.set_property("max-size-buffers", 0)
self.vrecq.set_property("leaky", 2)
self.vrecq_src = self.vrecq.get_static_pad("src")
self.vrecq_src_probe_id = self.vrecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb, None)
self.awss3sink = self.pipeline.get_by_name("awss3sink")
self.muxer = self.pipeline.get_by_name("mux")
self.update_awss3sink_key()
self.voaacenc = self.pipeline.get_by_name("voaacenc")
self.aacparse = self.pipeline.get_by_name("aacparse")
# Audio recording
self.arecq = self.pipeline.get_by_name("arecq")
self.arecq.set_property("max-size-time", 5*Gst.SECOND)
self.arecq.set_property("max-size-bytes", 0)
self.arecq.set_property("max-size-buffers", 0)
self.arecq.set_property("leaky", 2)
self.arecq_src = self.arecq.get_static_pad("src")
self.arecq_src_probe_id = self.arecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb_audio, None)
self.loop = GLib.MainLoop()
bus = self.pipeline.get_bus()
bus.add_watch(0, self.bus_cb, None)
self.pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add_seconds(10, self.start_recording_cb)
def update_awss3sink_key(self):
key = f"{self.chunk_count:03d}.mp4"
print(f"Setting awss3sink key to '{key}'")
self.chunk_count += 1
self.awss3sink.set_property("location", f"video_{key}")
def bus_cb(self, bus, msg, user_data):
if msg.type == Gst.MessageType.ERROR:
print("Error!")
print(msg.parse_error())
self.loop.quit()
return False
elif msg.type == Gst.MessageType.ELEMENT:
s = msg.get_structure()
if s and s.has_name("GstBinForwarded"):
forward_msg = s.get_value("message")
if forward_msg and forward_msg.type == Gst.MessageType.EOS:
print("EOS from element", forward_msg.src.get_name())
if forward_msg.src.get_name() != "awss3sink":
return True
self.awss3sink.set_state(Gst.State.NULL)
self.muxer.set_state(Gst.State.NULL)
self.voaacenc.set_state(Gst.State.NULL)
self.aacparse.set_state(Gst.State.NULL)
self.update_awss3sink_key()
self.awss3sink.set_state(Gst.State.PLAYING)
self.muxer.set_state(Gst.State.PLAYING)
self.voaacenc.set_state(Gst.State.PLAYING)
self.aacparse.set_state(Gst.State.PLAYING)
GLib.timeout_add_seconds(10, self.start_recording_cb)
return True
def block_probe_cb(self, pad, info, user_data):
print(f"Pad {pad.get_name()} blocked! video")
return Gst.PadProbeReturn.OK
def block_probe_cb_audio(self, pad, info, user_data):
print(f"Pad {pad.get_name()} blocked! audio")
return Gst.PadProbeReturn.OK
def probe_drop_one_cb(self, pad, info, user_data):
buf = info.get_buffer()
if self.buffer_count_video == 0:
print(f"Drop one buffer with ts {buf.pts}")
self.buffer_count_video += 1
return Gst.PadProbeReturn.DROP
is_keyframe = not buf.has_flags(Gst.BufferFlags.DELTA_UNIT)
print(f"Buffer with ts {buf.pts} (keyframe={is_keyframe})")
return Gst.PadProbeReturn.REMOVE if is_keyframe else Gst.PadProbeReturn.DROP
def probe_drop_one_cb_audio(self, pad, info, user_data):
buf = info.get_buffer()
if self.buffer_count_audio == 0:
self.buffer_count_audio += 1
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.REMOVE
def push_eos_thread(self):
vpeer = self.vrecq_src.get_peer()
apeer = self.arecq_src.get_peer()
print(f"Pushing EOS event on pad {vpeer.get_name()}")
vpeer.send_event(Gst.Event.new_eos())
print(f"Pushing EOS event on pad {apeer.get_name()}")
apeer.send_event(Gst.Event.new_eos())
def stop_recording_cb(self, user_data):
print("Stop recording, aiuto")
self.arecq_src_probe_id = self.arecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb_audio, None)
self.vrecq_src_probe_id = self.vrecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb, None)
threading.Thread(target=self.push_eos_thread).start()
return False
def start_recording_cb(self):
print("Timeout, unblocking pad to start recording")
self.buffer_count_video = 0
self.vrecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb, None)
self.vrecq_src.remove_probe(self.vrecq_src_probe_id)
self.vrecq_src_probe_id = 0
# audio
self.buffer_count_audio = 0
self.arecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb_audio, None)
self.arecq_src.remove_probe(self.arecq_src_probe_id)
self.arecq_src_probe_id = 0
GLib.timeout_add_seconds(5, self.stop_recording_cb, None)
return False
def run(self):
self.loop.run()
self.pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
app = RecordApp()
app.run()
Any advice or suggestions would be greatly appreciated! Thanks in advance.