Good day.
I use AI for create script for recording from USB camera.
Still is issue with working.
Please check code and GStream, thanks.
“”"
Camera Manager - Max FPS Pipeline (No Decoding in Recording Branch)
“”"
import subprocess
import threading
import time
import signal
import sys
import os
import shutil
from datetime import datetime
from typing import Optional, Callable, Tuple
from dataclasses import dataclass
import logging
try:
import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
except ImportError:
print(“Error: PyGObject and GStreamer 1.0 are required”)
sys.exit(1)
@dataclass
class CameraStats:
fps: float
exposure: int
iso: int
gain: int
white_balance: int
class CameraManager:
def init(self, logger: logging.Logger):
self.logger = logger
self.pipeline: Optional[Gst.Pipeline] = None
self.bus: Optional[Gst.Bus] = None
self.sample_callback: Optional[Callable] = None
self.is_running = False
self.is_recording = False
self.camera_device = ‘/dev/video1’
self.temp_file_path = "/tmp/lumo_temp_recording.mkv"
self.filesink_elem: Optional[Gst.Element] = None
self.tee: Optional[Gst.Element] = None
Gst.init(None)
self._setup_signal_handlers()
def _setup_signal_handlers(self):
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGHUP, self._signal_handler)
def _signal_handler(self, signum, frame):
self.logger.warning(f"Signal {signum}, shutting down...")
self.stop_pipeline()
sys.exit(0)
def detect_camera(self) -> bool:
try:
result = subprocess.run(['v4l2-ctl', '--list-devices'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
self.logger.info("Available video devices:\n" + result.stdout)
if 'Global Shutter' in result.stdout or os.path.exists('/dev/video1'):
self.camera_device = '/dev/video1'
self.logger.info("Aptina AR0234 Global Shutter Camera detected!")
return True
return False
except Exception as e:
self.logger.error(f"Camera detection failed: {e}")
return False
def get_camera_stats(self) -> CameraStats:
try:
result = subprocess.run(['v4l2-ctl', '-d', self.camera_device, '--all'], capture_output=True, text=True, timeout=2)
lines = result.stdout.split('\n')
fps, exp, iso, gain, wb = 0.0, 0, 0, 0, 0
for line in lines:
if 'fps' in line.lower():
try: fps = float(line.split(':')[1].strip())
except: pass
elif 'exposure' in line.lower() and 'auto' not in line.lower():
try: exp = int(line.split(':')[1].strip())
except: pass
elif 'iso' in line.lower():
try: iso = int(line.split(':')[1].strip())
except: pass
elif 'gain' in line.lower() and 'digital' not in line.lower():
try: gain = int(line.split(':')[1].strip())
except: pass
elif 'white_balance' in line.lower():
try: wb = int(line.split(':')[1].strip())
except: pass
return CameraStats(fps, exp, iso, gain, wb)
except Exception as e:
self.logger.error(f"Stats error: {e}")
return CameraStats(0, 0, 0, 0, 0)
def create_live_view_pipeline(self) -> bool:
"""
120 FPS Pipeline.
Recording Branch: Raw MJPEG -> filesink (No Muxer, saves as .mjpeg)
Live View Branch: MJPEG -> Decode -> RGB
"""
try:
# We save as .mjpeg (raw MJPEG stream) to avoid muxer overhead and negotiation errors.
# Files can be played directly in VLC/MPV.
pipeline_str = (
f"v4l2src device={self.camera_device} ! "
f"image/jpeg,width=1920,height=1200 ! "
f"tee name=t ! "
# Live View Branch: 120fps
f"queue leaky=2 max-size-buffers=10 ! "
f"jpegdec ! "
f"videoconvert ! "
f"video/x-raw,format=RGB ! "
f"appsink name=sink sync=false drop=true "
# Recording Branch: 120fps Raw MJPEG -> File
f"t. ! "
f"queue max-size-buffers=1000 ! "
f"filesink name=rec_sink location={self.temp_file_path} sync=false async=false"
)
self.logger.info("Creating 120 FPS pipeline (Raw MJPEG Recording)...")
self.pipeline = Gst.parse_launch(pipeline_str)
if not self.pipeline:
self.logger.error("Pipeline is None")
return False
self.pipeline.set_state(Gst.State.PLAYING)
sink = self.pipeline.get_by_name('sink')
self.filesink_elem = self.pipeline.get_by_name('rec_sink')
self.tee = self.pipeline.get_by_name('t')
if not self.tee or not self.filesink_elem:
self.logger.error("Missing tee or filesink element")
return False
if sink:
sink.set_property('emit-signals', True)
sink.connect('new-sample', self._on_new_sample)
self.bus = self.pipeline.get_bus()
self.bus.add_watch(GLib.PRIORITY_DEFAULT, self._on_bus_message)
self.is_running = True
self.logger.info("Pipeline started at 120 FPS (Live + Record)")
return True
except Exception as e:
self.logger.error(f"Pipeline failed: {e}")
import traceback
self.logger.error(traceback.format_exc())
return False
def start_recording(self, output_file: str) -> bool:
if not self.is_running: return False
try:
self.pipeline.set_state(Gst.State.PAUSED)
time.sleep(0.2)
if os.path.exists(self.temp_file_path):
shutil.move(self.temp_file_path, output_file)
self.logger.info(f"Moved old temp to {output_file}")
else:
self.logger.warning(f"No old temp file: {self.temp_file_path}")
new_temp = f"/tmp/lumo_temp_{int(time.time())}_{os.getpid()}.mkv"
if self.filesink_elem:
self.filesink_elem.set_property('location', new_temp)
self.temp_file_path = new_temp
self.pipeline.set_state(Gst.State.PLAYING)
# Wait for file creation
start_wait = time.time()
while not os.path.exists(self.temp_file_path):
if time.time() - start_wait > 2.0:
self.logger.error(f"Timeout: File {self.temp_file_path} not created")
open(self.temp_file_path, 'w').close()
break
time.sleep(0.1)
self.is_recording = True
self.logger.info(f"Recording started: {output_file} (Temp: {self.temp_file_path})")
return True
except Exception as e:
self.logger.error(f"Start error: {e}")
if self.pipeline: self.pipeline.set_state(Gst.State.PLAYING)
return False
def stop_recording(self) -> bool:
if not self.is_recording: return False
try:
self.pipeline.set_state(Gst.State.PAUSED)
time.sleep(0.5)
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
final_file = f"/home/peter/recordings/manual_{timestamp}.mkv"
if os.path.exists(self.temp_file_path):
shutil.move(self.temp_file_path, final_file)
self.logger.info(f"Moved temp to {final_file}")
else:
self.logger.warning(f"Temp file missing: {self.temp_file_path}")
open(final_file, 'w').close()
new_temp = f"/tmp/lumo_temp_{int(time.time())}_{os.getpid()}.mkv"
if self.filesink_elem:
self.filesink_elem.set_property('location', new_temp)
self.temp_file_path = new_temp
self.pipeline.set_state(Gst.State.PLAYING)
self.is_recording = False
self.logger.info(f"Recording stopped: {final_file}")
return True
except Exception as e:
self.logger.error(f"Stop error: {e}")
if self.pipeline: self.pipeline.set_state(Gst.State.PLAYING)
return False
def _on_new_sample(self, sink) -> Gst.FlowReturn:
if self.sample_callback:
try:
sample = sink.emit('pull-sample')
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
self.sample_callback(map_info.data, caps)
buffer.unmap(map_info)
except Exception as e:
self.logger.error(f"Sample error: {e}")
return Gst.FlowReturn.OK
def _on_bus_message(self, bus, message) -> bool:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
self.logger.error(f"Error: {err.message}")
self.is_running = False
return True
def set_sample_callback(self, callback: Callable) -> None:
self.sample_callback = callback
def stop_pipeline(self) -> None:
if self.is_recording:
self.stop_recording()
if self.pipeline:
self.pipeline.send_event(Gst.Event.new_eos())
time.sleep(0.5)
self.pipeline.set_state(Gst.State.NULL)
self.logger.info("Pipeline stopped")
self.is_running = False
self.is_recording = False
def get_status(self) -> dict:
state = "stopped"
if self.pipeline:
try:
ret, state_val, pending = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
state_map = {Gst.State.NULL: "stopped", Gst.State.READY: "ready", Gst.State.PAUSED: "paused", Gst.State.PLAYING: "playing"}
state = state_map.get(state_val, "unknown")
except: pass
return {'running': self.is_running, 'recording': self.is_recording, 'state': state, 'fps': 0}
“”"
Configuration management for Lumo Camera Application
“”"
import os
import json
from pathlib import Path
from typing import Dict, Any, Optional
class Config:
“”“Application configuration handler”“”
DEFAULT_CONFIG = {
'camera': {
'device': '/dev/video1', # Updated to match your detected camera
'resolution_width': 1920,
'resolution_height': 1200,
'fps': 120,
'bitrate': 50000000, # 50 Mbps
'jpeg_quality': 100,
'auto_white_balance': True,
'auto_exposure': True,
},
'recording': {
'segment_duration_seconds': 600, # 10 minutes
'output_dir': str(Path.home() / 'recordings'),
'log_dir': str(Path.home() / 'logs'),
'motion_threshold': 30,
'motion_pre_buffer_seconds': 10,
'motion_post_buffer_seconds': 10,
},
'gui': {
'window_title': 'Lumo Camera Recorder',
'min_width': 800,
'min_height': 600,
},
'logging': {
'level': 'DEBUG',
'format': '%(asctime)s - %(levelname)s - %(name)s - %(message)s',
}
}
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or str(Path.home() / '.lumo_camera_config.json')
self._config: Dict[str, Any] = {}
self.load()
def load(self) -> None:
"""Load configuration from file or use defaults"""
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r') as f:
loaded = json.load(f)
self._config = self._deep_merge(self.DEFAULT_CONFIG, loaded)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not load config, using defaults: {e}")
self._config = self.DEFAULT_CONFIG.copy()
else:
self._config = self.DEFAULT_CONFIG.copy()
self.save()
def save(self) -> None:
"""Save current configuration to file"""
try:
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self._config, f, indent=2)
except IOError as e:
print(f"Warning: Could not save config: {e}")
def _deep_merge(self, base: Dict, override: Dict) -> Dict:
"""Deep merge two dictionaries"""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = value
return result
def get(self, section: str, key: str, default: Any = None) -> Any:
"""Get configuration value"""
try:
return self._config.get(section, {}).get(key, default)
except (TypeError, AttributeError):
return default
def set(self, section: str, key: str, value: Any) -> None:
"""Set configuration value"""
if section not in self._config:
self._config[section] = {}
self._config[section][key] = value
self.save()
@property
def camera_device(self) -> str:
return self.get('camera', 'device', '/dev/video1')
@property
def resolution(self) -> tuple:
return (
self.get('camera', 'resolution_width', 1920),
self.get('camera', 'resolution_height', 1200)
)
@property
def fps(self) -> int:
return self.get('camera', 'fps', 120)
@property
def segment_duration(self) -> int:
return self.get('recording', 'segment_duration_seconds', 600)
@property
def output_dir(self) -> str:
return self.get('recording', 'output_dir', str(Path.home() / 'recordings'))
@property
def log_dir(self) -> str:
return self.get('recording', 'log_dir', str(Path.home() / 'logs'))
“”"
GUI - Max FPS + Telemetry + Manual Control
“”"
import gi
gi.require_version(‘Gtk’, ‘4.0’)
gi.require_version(‘Gdk’, ‘4.0’)
gi.require_version(‘Gio’, ‘2.0’)
from gi.repository import Gtk, Gdk, GLib, GdkPixbuf, Gio
import numpy as np
from typing import Optional
import logging
import os
import cv2
class CameraGUI(Gtk.ApplicationWindow):
def init(self, app, camera_manager, recorder, logger=None):
super().init(application=app)
self.camera_manager = camera_manager
self.recorder = recorder
self.logger = logger or logging.getLogger(name)
self._frame_buffer = None
self._stats = None
self.set_title("Lumo Max FPS Recorder")
self.set_default_size(1280, 720)
self.set_resizable(True)
self._build_ui()
self.camera_manager.set_sample_callback(self._on_new_frame)
self.recorder.set_file_callback(self._on_file_completed)
# Update stats every 1 second
GLib.timeout_add(1000, self._update_telemetry)
GLib.timeout_add(33, self._update_status) # ~30Hz UI refresh
def _build_ui(self):
main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
main_box.set_margin_start(10)
main_box.set_margin_end(10)
main_box.set_margin_top(10)
main_box.set_margin_bottom(10)
self.set_child(main_box)
# Header
header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
header.set_margin_bottom(10)
title = Gtk.Label(label="LUMO MAX FPS")
header.append(title)
spacer = Gtk.Box()
spacer.set_hexpand(True)
header.append(spacer)
# Manual Record Button
self.btn_record = Gtk.Button(label="● RECORD (R)")
self.btn_record.add_css_class("suggested-action")
self.btn_record.connect('clicked', self._on_toggle_record)
header.append(self.btn_record)
btn_quit = Gtk.Button(label="Quit (Q)")
btn_quit.connect('clicked', lambda b: self.get_application().quit())
header.append(btn_quit)
main_box.append(header)
# Live View
self.live_view = Gtk.DrawingArea()
self.live_view.set_content_width(1280)
self.live_view.set_content_height(720)
self.live_view.set_hexpand(True)
self.live_view.set_vexpand(True)
self.live_view.set_draw_func(self._draw_frame)
main_box.append(self.live_view)
# Telemetry Bar
telemetry_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=15)
telemetry_box.set_margin_top(10)
telemetry_box.set_margin_bottom(10)
self.lbl_fps = Gtk.Label(label="FPS: --")
self.lbl_ev = Gtk.Label(label="EV: --")
self.lbl_iso = Gtk.Label(label="ISO: --")
self.lbl_gain = Gtk.Label(label="Gain: --")
self.lbl_wb = Gtk.Label(label="WB: --")
self.lbl_status = Gtk.Label(label="Status: Idle")
for lbl in [self.lbl_fps, self.lbl_ev, self.lbl_iso, self.lbl_gain, self.lbl_wb, self.lbl_status]:
lbl.set_halign(Gtk.Align.START)
telemetry_box.append(lbl)
main_box.append(telemetry_box)
# Keyboard
ctrl = Gtk.EventControllerKey.new()
ctrl.connect('key-pressed', self._on_key_pressed)
self.add_controller(ctrl)
def _on_key_pressed(self, ctrl, keyval, keycode, state):
key = chr(keycode).lower()
if key == 'r':
self._on_toggle_record(None)
return True
if key == 'q':
self.get_application().quit()
return True
return False
def _on_toggle_record(self, btn):
path = self.recorder.toggle_recording()
if path:
self.lbl_status.set_text(f"Status: Recording {os.path.basename(path)}")
self.btn_record.set_label("■ STOP (R)")
self.btn_record.remove_css_class("suggested-action")
self.btn_record.add_css_class("destructive-action")
else:
self.lbl_status.set_text("Status: Stopped")
self.btn_record.set_label("● RECORD (R)")
self.btn_record.remove_css_class("destructive-action")
self.btn_record.add_css_class("suggested-action")
def _on_new_frame(self, data, caps):
try:
w = caps.get_structure(0).get_int('width')[1]
h = caps.get_structure(0).get_int('height')[1]
frame = np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3))
self._frame_buffer = frame
# FEED FRAME TO MOTION DETECTOR
if self.recorder._motion_detector:
if self.recorder._motion_detector.process_frame(frame):
# Trigger save if motion detected
self.recorder._on_motion_detected()
self.live_view.queue_draw()
except Exception as e:
self.logger.error(f"Frame error: {e}")
def _draw_frame(self, widget, ctx, w, h):
if self._frame_buffer is None:
ctx.set_source_rgb(0.1, 0.1, 0.1)
ctx.rectangle(0, 0, w, h)
ctx.fill()
return
try:
fh, fw, _ = self._frame_buffer.shape
scale = min(w/fw, h/fh)
nw, nh = int(fw*scale), int(fh*scale)
resized = cv2.resize(self._frame_buffer, (nw, nh))
rgba = cv2.cvtColor(resized, cv2.COLOR_RGB2RGBA)
pix = GdkPixbuf.Pixbuf.new_from_data(rgba.tobytes(), GdkPixbuf.Colorspace.RGB, True, 8, nw, nh, nw*4, None, None)
x, y = (w-nw)//2, (h-nh)//2
ctx.set_source_rgb(0.1, 0.1, 0.1)
ctx.rectangle(0, 0, w, h)
ctx.fill()
Gdk.cairo_set_source_pixbuf(ctx, pix, x, y)
ctx.paint()
except Exception as e:
self.logger.error(f"Draw error: {e}")
def _update_telemetry(self):
stats = self.camera_manager.get_camera_stats()
self._stats = stats
self.lbl_fps.set_text(f"FPS: {stats.fps:.1f}")
self.lbl_ev.set_text(f"EV: {stats.exposure}")
self.lbl_iso.set_text(f"ISO: {stats.iso}")
self.lbl_gain.set_text(f"Gain: {stats.gain}")
self.lbl_wb.set_text(f"WB: {stats.white_balance}")
return True
def _update_status(self):
status = self.recorder.get_recording_status()
if status['is_recording']:
self.lbl_status.set_text(f"Status: REC {os.path.basename(status['current_file'])}")
else:
self.lbl_status.set_text("Status: Idle")
return True
def _on_file_completed(self, filepath):
pass
class CameraApp(Gtk.Application):
def init(self, camera_manager, recorder, logger):
super().init(application_id=“me.lumo.camera”, flags=Gio.ApplicationFlags.FLAGS_NONE)
self.camera_manager = camera_manager
self.recorder = recorder
self.logger = logger
self.main_window = None
def do_activate(self):
self.main_window = CameraGUI(self, self.camera_manager, self.recorder, self.logger)
self.main_window.present()
if not self.camera_manager.create_live_view_pipeline():
self.logger.error("Pipeline failed")
def main():
… (main.py logic remains same, just instantiate Recorder without motion) …
pass
“”"
Logging setup for Lumo Camera Application
“”"
import logging
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
def setup_logger(
name: str,
log_dir: str,
level: str = ‘DEBUG’,
console: bool = True,
file: bool = True
) → logging.Logger:
“”"
Set up a logger with both console and file handlers
Args:
name: Logger name
log_dir: Directory for log files
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
console: Enable console output
file: Enable file output
Returns:
Configured logger instance
"""
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper(), logging.DEBUG))
# Clear existing handlers
logger.handlers.clear()
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler
if console:
ch = logging.StreamHandler()
ch.setLevel(level)
ch.setFormatter(formatter)
logger.addHandler(ch)
# File handler
if file:
os.makedirs(log_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = os.path.join(log_dir, f'lumo_camera_{timestamp}.log')
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
# Also create a symlink to latest log
latest_link = os.path.join(log_dir, 'latest.log')
if os.path.islink(latest_link):
os.remove(latest_link)
os.symlink(log_file, latest_link)
return logger
#!/usr/bin/env python3
import sys
import argparse
import logging
from pathlib import Path
from config import Config
from logger import setup_logger
from camera_manager import CameraManager
from motion_detector import MotionDetector # Import detector
from recorder import Recorder
from gui import CameraApp
from utils import check_dependencies, ensure_directories, get_system_info
def main():
parser = argparse.ArgumentParser(description=“Lumo Max FPS Recorder”)
parser.add_argument(‘–config’, type=str, default=None, help=‘Path to config file’)
args = parser.parse_args()
config = Config(args.config)
logger = setup_logger(name='lumo_camera', log_dir=config.log_dir, level='INFO')
logger.info("Starting Lumo Max FPS Recorder (Manual + Motion Mode)")
missing = check_dependencies()
if missing:
logger.error("Missing: " + ", ".join(missing))
sys.exit(1)
ensure_directories([config.output_dir, config.log_dir])
camera_manager = CameraManager(logger)
# Initialize Motion Detector
# Threshold: 20 (sensitivity), Min Area: 1000 (ignore small noise), Cooldown: 3s
motion_detector = MotionDetector(threshold=20, min_area=1000, cooldown=3.0, logger=logger)
recorder = Recorder(
output_dir=config.output_dir,
logger=logger,
camera_manager=camera_manager
)
# ENABLE MOTION DETECTION
recorder.enable_motion_detection(motion_detector)
if not camera_manager.detect_camera():
logger.error("Camera detection failed.")
sys.exit(1)
app = CameraApp(camera_manager, recorder, logger)
try:
app.run(sys.argv)
except KeyboardInterrupt:
logger.info("Interrupted by user")
finally:
logger.info("Shutting down...")
recorder.disable_motion_detection() # Stop detector
camera_manager.stop_pipeline()
logger.info("Done")
if name == “main”:
main()
“”"
Motion Detection Module
“”"
import numpy as np
import cv2
import threading
import time
from typing import Optional, Callable
import logging
class MotionDetector:
def init(self, threshold: int = 20, min_area: int = 1000, cooldown: float = 3.0, logger: Optional[logging.Logger] = None):
“”"
Initialize Motion Detector.
:param threshold: Sensitivity (0-255). Lower = more sensitive.
:param min_area: Minimum pixel area to count as motion.
:param cooldown: Seconds to wait between detections.
“”"
self.threshold = threshold
self.min_area = min_area
self.cooldown = cooldown
self.logger = logger or logging.getLogger(name)
self._is_running = False
self._lock = threading.Lock()
self._previous_frame: Optional[np.ndarray] = None
self._motion_callback: Optional[Callable] = None
self._last_motion_time = 0.0
def start(self) -> None:
with self._lock:
self._is_running = True
self._previous_frame = None
self.logger.info("Motion detector started")
def stop(self) -> None:
with self._lock:
self._is_running = False
self._previous_frame = None
self.logger.info("Motion detector stopped")
def process_frame(self, frame: np.ndarray) -> bool:
"""Returns True if motion is detected and cooldown passed."""
if not self._is_running:
return False
with self._lock:
# Convert to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
blurred = cv2.GaussianBlur(gray, (21, 21), 0)
if self._previous_frame is None:
self._previous_frame = blurred
return False
# Diff
frame_delta = cv2.absdiff(self._previous_frame, blurred)
_, thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)
thresh = cv2.dilate(thresh, None, iterations=2)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
self._previous_frame = blurred
for contour in contours:
if cv2.contourArea(contour) >= self.min_area:
current_time = time.time()
if current_time - self._last_motion_time >= self.cooldown:
self._last_motion_time = current_time
return True
return False
def set_motion_callback(self, callback: Callable) -> None:
self._motion_callback = callback
“”"
Recorder - Manual + Motion Mode
“”"
import os
import time
import threading
from datetime import datetime
from typing import Optional, Callable
import logging
class Recorder:
def init(self, output_dir: str, logger=None, camera_manager=None):
self.output_dir = output_dir
self.logger = logger or logging.getLogger(name)
self.camera_manager = camera_manager
self._is_recording = False
self._current_file: Optional[str] = None
self._lock = threading.Lock()
self._file_callback: Optional[Callable[[str], None]] = None
# Motion settings
self._motion_active = False
self._motion_detector = None
self._motion_callback_ref = None
os.makedirs(output_dir, exist_ok=True)
def enable_motion_detection(self, detector) -> None:
"""Enable automatic motion recording."""
self._motion_detector = detector
self._motion_active = True
# Bind the callback
self._motion_callback_ref = self._on_motion_detected
self._motion_detector.set_motion_callback(self._motion_callback_ref)
self._motion_detector.start()
self.logger.info("Motion detection ENABLED")
def disable_motion_detection(self) -> None:
"""Disable automatic motion recording."""
self._motion_active = False
if self._motion_detector:
self._motion_detector.stop()
self.logger.info("Motion detection DISABLED")
def _on_motion_detected(self) -> None:
"""Callback triggered by MotionDetector."""
if not self._motion_active:
return
# Trigger a save (same logic as manual R press)
self._save_current_clip()
def _save_current_clip(self) -> Optional[str]:
"""Internal method to save the temp file."""
if not self.camera_manager:
return None
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
filename = f"motion_{timestamp}.mkv"
filepath = os.path.join(self.output_dir, filename)
success = self.camera_manager.start_recording(filepath)
if success:
self._current_file = filepath
self.logger.info(f"MOTION DETECTED: Saved {filename}")
if self._file_callback:
self._file_callback(filepath)
return filepath
return None
def toggle_recording(self) -> Optional[str]:
"""Manual Start/Stop (Press R)."""
with self._lock:
if self._is_recording:
if self.camera_manager:
self.camera_manager.stop_recording()
self._is_recording = False
self.logger.info("Manual recording stopped")
return self._current_file
else:
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
filename = f"manual_{timestamp}.mkv"
filepath = os.path.join(self.output_dir, filename)
if self.camera_manager:
success = self.camera_manager.start_recording(filepath)
if success:
self._is_recording = True
self._current_file = filepath
self.logger.info("Manual recording started")
return filepath
return None
def get_recording_status(self) -> dict:
with self._lock:
return {
'is_recording': self._is_recording,
'current_file': self._current_file
}
# --- MISSING METHOD ADDED BELOW ---
def set_file_callback(self, callback: Callable[[str], None]) -> None:
"""Set callback for when a file is saved."""
self._file_callback = callback
# ----------------------------------