Issue with GStreamer Probes for Video/Audio Synchronization

Issue

I am working on a GStreamer-based recording pipeline on an ARM64 platform. My goal is to record 10-second video clips, where the first 5 seconds are continuously buffered in a circular queue, and the remaining 5 seconds are captured with the probes unblocked before being written to a file. To achieve this, I use pad probes to pause both the video and audio streams before multiplexing. I found this snippet, which describes how to block the video stream, but I am unsure if I am handling audio and video synchronization correctly.

Expected Behavior

  • The recorded video clips should have perfectly synchronized video and audio.
  • Both the video and audio streams should pause and resume simultaneously when the probes are blocked and unblocked.

Observed Behavior

  • Video behaves as expected: frames are recorded correctly.
  • Audio is completely desynchronized and appears to continue playing, even when the probe is supposed to block it.
  • The resulting files contain video with correct timing, but the audio does not align properly.

Setup

  • Device: [Raspberry pi 5]
  • GStreamer Version: [1.24.11]

Code

Here is the Python script I am using:
import gi
import time
import threading

gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib

class RecordApp:
    def __init__(self):
        Gst.init(None)
        self.chunk_count = 0
        self.buffer_count_video = 0
        self.buffer_count_audio = 0

        self.pipeline = Gst.parse_launch(
            "audiotestsrc is-live=true ! audioconvert ! tee name=atee "
            "atee. ! queue ! fakesink "
            "atee. ! queue name=arecq ! voaacenc name=voaacenc ! aacparse name=aacparse ! mux. "
            "videotestsrc is-live=true ! video/x-raw,width=1920,height=1080,format=I420 ! "
            "timeoverlay ! x264enc key-int-max=1 tune=zerolatency bitrate=8000 ! "
            "queue name=vrecq ! mp4mux name=mux ! "
            "filesink location=video.mp4 name=awss3sink"
        )

        self.pipeline.set_property("message-forward", True)

        self.vrecq = self.pipeline.get_by_name("vrecq")
        self.vrecq.set_property("max-size-time", 5*Gst.SECOND)
        self.vrecq.set_property("max-size-bytes", 0)
        self.vrecq.set_property("max-size-buffers", 0)
        self.vrecq.set_property("leaky", 2)

        self.vrecq_src = self.vrecq.get_static_pad("src")
        self.vrecq_src_probe_id = self.vrecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb, None)

        self.awss3sink = self.pipeline.get_by_name("awss3sink")
        self.muxer = self.pipeline.get_by_name("mux")
        self.update_awss3sink_key()

        self.voaacenc = self.pipeline.get_by_name("voaacenc")
        self.aacparse = self.pipeline.get_by_name("aacparse")

        # Audio recording
        self.arecq = self.pipeline.get_by_name("arecq")
        self.arecq.set_property("max-size-time", 5*Gst.SECOND)
        self.arecq.set_property("max-size-bytes", 0)
        self.arecq.set_property("max-size-buffers", 0)
        self.arecq.set_property("leaky", 2)
        self.arecq_src = self.arecq.get_static_pad("src")
        self.arecq_src_probe_id = self.arecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb_audio, None)

        self.loop = GLib.MainLoop()
        bus = self.pipeline.get_bus()
        bus.add_watch(0, self.bus_cb, None)

        self.pipeline.set_state(Gst.State.PLAYING)
        GLib.timeout_add_seconds(10, self.start_recording_cb)

    def update_awss3sink_key(self):
        key = f"{self.chunk_count:03d}.mp4"
        print(f"Setting awss3sink key to '{key}'")
        self.chunk_count += 1
        self.awss3sink.set_property("location", f"video_{key}")

    def bus_cb(self, bus, msg, user_data):
        if msg.type == Gst.MessageType.ERROR:
            print("Error!")
            print(msg.parse_error())
            self.loop.quit()
            return False
        elif msg.type == Gst.MessageType.ELEMENT:
            s = msg.get_structure()
            if s and s.has_name("GstBinForwarded"):
                forward_msg = s.get_value("message")
                if forward_msg and forward_msg.type == Gst.MessageType.EOS:
                    print("EOS from element", forward_msg.src.get_name())

                    if forward_msg.src.get_name() != "awss3sink":
                        return True

                    self.awss3sink.set_state(Gst.State.NULL)
                    self.muxer.set_state(Gst.State.NULL)
                    self.voaacenc.set_state(Gst.State.NULL)
                    self.aacparse.set_state(Gst.State.NULL)
                    self.update_awss3sink_key()
                    self.awss3sink.set_state(Gst.State.PLAYING)
                    self.muxer.set_state(Gst.State.PLAYING)
                    self.voaacenc.set_state(Gst.State.PLAYING)
                    self.aacparse.set_state(Gst.State.PLAYING)
                    GLib.timeout_add_seconds(10, self.start_recording_cb)
        return True

    def block_probe_cb(self, pad, info, user_data):
        print(f"Pad {pad.get_name()} blocked! video")
        return Gst.PadProbeReturn.OK

    def block_probe_cb_audio(self, pad, info, user_data):
        print(f"Pad {pad.get_name()} blocked! audio")
        return Gst.PadProbeReturn.OK

    def probe_drop_one_cb(self, pad, info, user_data):
        buf = info.get_buffer()
        if self.buffer_count_video == 0:
            print(f"Drop one buffer with ts {buf.pts}")
            self.buffer_count_video += 1
            return Gst.PadProbeReturn.DROP
        is_keyframe = not buf.has_flags(Gst.BufferFlags.DELTA_UNIT)
        print(f"Buffer with ts {buf.pts} (keyframe={is_keyframe})")
        return Gst.PadProbeReturn.REMOVE if is_keyframe else Gst.PadProbeReturn.DROP

    def probe_drop_one_cb_audio(self, pad, info, user_data):
        buf = info.get_buffer()

        if self.buffer_count_audio == 0:
            self.buffer_count_audio += 1
            return Gst.PadProbeReturn.DROP

        return Gst.PadProbeReturn.REMOVE

    def push_eos_thread(self):

        vpeer = self.vrecq_src.get_peer()
        apeer = self.arecq_src.get_peer()

        print(f"Pushing EOS event on pad {vpeer.get_name()}")
        vpeer.send_event(Gst.Event.new_eos())

        print(f"Pushing EOS event on pad {apeer.get_name()}")
        apeer.send_event(Gst.Event.new_eos())

    def stop_recording_cb(self, user_data):
        print("Stop recording, aiuto")
        self.arecq_src_probe_id = self.arecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb_audio, None)
        self.vrecq_src_probe_id = self.vrecq_src.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb, None)
        threading.Thread(target=self.push_eos_thread).start()
        return False

    def start_recording_cb(self):
        print("Timeout, unblocking pad to start recording")
        self.buffer_count_video = 0
        self.vrecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb, None)
        self.vrecq_src.remove_probe(self.vrecq_src_probe_id)
        self.vrecq_src_probe_id = 0

        # audio
        self.buffer_count_audio = 0
        self.arecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb_audio, None)
        self.arecq_src.remove_probe(self.arecq_src_probe_id)
        self.arecq_src_probe_id = 0

        GLib.timeout_add_seconds(5, self.stop_recording_cb, None)
        return False

    def run(self):
        self.loop.run()
        self.pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    app = RecordApp()
    app.run()

Any advice or suggestions would be greatly appreciated! Thanks in advance.