Video synchronization with external sensors

Hello!

I need to record a video and save it into a file and, simultaneously, record the readings coming from other sensors (this is done independently in a different script). After having recorded both, video and sensor readings must be used synchronously withing an error < 0.01 s. For doing it, I’m trying to use the CPU time as base time for the database.

The sensors readings are directly stored alongside the CPU time, and I’m trying to do the same with the video frames. Actually, I would only need to know precisely when the first frame is captured (measured using the CPU time), and the time for the subsequent frames will be estimated adding their PTS to that initial time.

I have created a basic example to show how I’m estimating the database time for the frames:

import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

import time


class Camera:

    def __init__(self):

        self._first_frame = True
        self._t_ini_ns = None
        self._t_db_pts_origin_ns = None

        Gst.init(None)

        resolution = (640, 480)
        fps = 30

        self._pipeline_str = (
            f'v4l2src device="/dev/video0" ! identity name=iden ! '
            f'video/x-raw, width={resolution[0]}, height={resolution[1]}, framerate={fps}/1 ! '
            f'videoconvert ! '
            f'x265enc ! '
            f'mpegtsmux ! '
            f'fakesink'
        )

    def start_recording(self, t_ini_ns):
        self._t_ini_ns = t_ini_ns

        pipeline = Gst.parse_launch(self._pipeline_str)
        pipeline.get_by_name("iden").connect("handoff", self._on_new_camera_frame)

        pipeline.set_state(Gst.State.PLAYING)

        loop = GLib.MainLoop()
        loop.run()

    def _on_new_camera_frame(self, element, buffer):
        frame_pts = buffer.pts

        if self._first_frame:
            self._first_frame = False
            self._t_db_pts_origin_ns = int(time.time_ns() - self._t_ini_ns - frame_pts)
            print(f'First frame PTS: {frame_pts / 1e9:.3f} s')
            print(f'Database PTS origin frame: {self._t_db_pts_origin_ns / 1e9:.3f} s \n')

        else:
            frame_t_db_ns = frame_pts + self._t_db_pts_origin_ns
            print('New frame')
            print(f'Frame PTS: {frame_pts / 1e9:.3f} s')
            print(f'Frame DB time: {frame_t_db_ns / 1e9:.3f} s \n')

if __name__ == '__main__':

    camera = Camera()
    camera.start_recording(time.time_ns())

I’m using the handoff signal of the identity element to associate the PTS of the frames with the CPU time.

With this approach, recording a specific type of database, I’m able to estimate the time mismatch between the frames and the sensor recordings, which is approximately 0.2 seconds (way to high).

I would not be a problem if this mismatch was constant among the different database recordings, but my concern is that this depends on the CPU load or any other uncontrolled condition (for instance, recording more than one video at the same time, or recording more sensors).

My question is: is there any reliable way of associating the CPU time to the first frame when it is captured? Or, is there any better way of synchronizing the video with external data (not produced nor managed by Gstreamer)?

Thanks!