I am using multicast to stream a video only file over udp - i am wondering how the chunk timestamps are generated, they are obviously not the position in the file i’m streaming.if i start the client multiple times while the filestream is running - the timestamps on the receiving side always seems to start from 0 again which makes me think that they are related to the time the receiver pipleline was started?
➜ python git:(vp8) ✗ python3 receiver-file.py
2025-10-05 14:32:26,795 - INFO - 🚀 Pipeline running, waiting for multicast stream on 239.255.42.42:5000...
2025-10-05 14:32:26,796 - INFO - 📁 Output directory: chunks
2025-10-05 14:32:27,186 - INFO - 🎬 Pipeline started receiving data at 14:32:27
2025-10-05 14:32:27,413 - INFO - First cluster found. Writing init segment.
2025-10-05 14:32:28,905 - INFO - 🔑 Writing KEYFRAME chunk 1 (123706 bytes) - webm_timestamp=391ms (0.391s) | pipeline_elapsed=1.720s
2025-10-05 14:32:29,906 - INFO - 🔑 Writing KEYFRAME chunk 2 (106671 bytes) - webm_timestamp=1887ms (1.887s) | pipeline_elapsed=2.721s
2025-10-05 14:32:30,909 - INFO - 🔑 Writing KEYFRAME chunk 3 (108166 bytes) - webm_timestamp=2887ms (2.887s) | pipeline_elapsed=3.724s
2025-10-05 14:32:31,910 - INFO - 🔑 Writing KEYFRAME chunk 4 (109291 bytes) - webm_timestamp=3887ms (3.887s) | pipeline_elapsed=4.725s
2025-10-05 14:32:32,905 - INFO - 🔑 Writing KEYFRAME chunk 5 (116803 bytes) - webm_timestamp=4887ms (4.887s) | pipeline_elapsed=5.719s
2025-10-05 14:32:33,908 - INFO - 🔑 Writing KEYFRAME chunk 6 (105452 bytes) - webm_timestamp=5887ms (5.887s) | pipeline_elapsed=6.723s
^C
2025-10-05 14:32:34,462 - INFO - Stopping pipeline...
2025-10-05 14:32:34,462 - INFO - 🔑 Flushing final KEYFRAME chunk 7 (77832 bytes) - timestamp=6887ms (6.887s)
... stream is still running, restarting receiver...
➜ python git:(vp8) ✗ python3 receiver-file.py
2025-10-05 14:32:35,742 - INFO - 🚀 Pipeline running, waiting for multicast stream on 239.255.42.42:5000...
2025-10-05 14:32:35,742 - INFO - 📁 Output directory: chunks
2025-10-05 14:32:36,189 - INFO - 🎬 Pipeline started receiving data at 14:32:36
2025-10-05 14:32:36,412 - INFO - First cluster found. Writing init segment.
2025-10-05 14:32:37,908 - INFO - 🔑 Writing KEYFRAME chunk 1 (162525 bytes) - webm_timestamp=444ms (0.444s) | pipeline_elapsed=1.718s
2025-10-05 14:32:38,910 - INFO - 🔑 Writing KEYFRAME chunk 2 (97975 bytes) - webm_timestamp=1940ms (1.940s) | pipeline_elapsed=2.720s
2025-10-05 14:32:39,907 - INFO - 🔑 Writing KEYFRAME chunk 3 (144099 bytes) - webm_timestamp=2940ms (2.940s) | pipeline_elapsed=3.718s
^C
2025-10-05 14:32:40,508 - INFO - Stopping pipeline...
2025-10-05 14:32:40,508 - INFO - 🔑 Flushing final KEYFRAME chunk 4 (89054 bytes) - timestamp=3940ms (3.940s)
import os
import sys
import io
import logging
import argparse
import time
# --- VENDORING SETUP ---
_APP_DIR = os.path.dirname(os.path.abspath(__file__))
_VENDOR_DIR = os.path.join(_APP_DIR, 'vendor')
if os.path.isdir(_VENDOR_DIR):
sys.path.insert(0, _VENDOR_DIR)
else:
print(f"Vendor directory not found at {_VENDOR_DIR}")
try:
import gi
import ebmlite
except ImportError as e:
print(f"Missing dependency: {e}")
sys.exit(1)
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
# Initialize GStreamer
Gst.init(None)
# Parse command line arguments
parser = argparse.ArgumentParser(description="WebM multicast receiver with file output")
parser.add_argument("--port", type=int, default=5000, help="Multicast port (default: 5000)")
parser.add_argument("--group", default="239.255.42.42", help="Multicast group (default: 239.255.42.42)")
parser.add_argument("--output-dir", default="chunks", help="Output directory for chunks (default: chunks)")
args = parser.parse_args()
# Use command line arguments
MULTICAST_GROUP = args.group
MULTICAST_PORT = args.port
CHUNK_DIR = args.output_dir
os.makedirs(CHUNK_DIR, exist_ok=True)
# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load WebM schema
try:
webm_schema = ebmlite.loadSchema("matroska.xml")
except Exception as e:
logging.critical(f"Could not load matroska.xml schema: {e}")
exit(1)
# Files and settings
INIT_FILE = f"{CHUNK_DIR}/init.webm"
CHUNK_PREFIX = "chunk_"
# --- State Variables ---
# Use a bytearray for efficient appending and slicing
stream_buffer = bytearray()
first_cluster_received = False
chunk_index = 1
CLUSTER_ID = b"\x1F\x43\xB6\x75" # The EBML ID for a Cluster element
pipeline_start_time = None # Track when pipeline starts receiving data
def get_vint_length(first_byte: int) -> int:
"""Get variable integer length from first byte."""
if first_byte & 0x80: return 1
if first_byte & 0x40: return 2
if first_byte & 0x20: return 3
if first_byte & 0x10: return 4
return 1
def parse_is_keyframe(cluster_bytes: bytes) -> bool:
"""Detect if cluster contains keyframe using SimpleBlock flags."""
try:
cluster_stream = io.BytesIO(cluster_bytes)
document = webm_schema.load(cluster_stream)
for element in document:
if element.name == "Cluster":
for sub_element in element:
if sub_element.name == "SimpleBlock":
block_data = sub_element.value
if len(block_data) < 4:
continue
vint_len = get_vint_length(block_data[0])
flags_offset = vint_len + 2
if flags_offset < len(block_data):
flags_byte = block_data[flags_offset]
if flags_byte & 0x80:
return True
except Exception as e:
logging.error(f"EBML parse failed: {e}")
return False
return False
def parse_cluster_timestamp(cluster_bytes: bytes) -> int:
"""Extract timestamp from WebM cluster."""
try:
cluster_stream = io.BytesIO(cluster_bytes)
document = webm_schema.load(cluster_stream)
for element in document:
if element.name == "Cluster":
for sub_element in element:
if sub_element.name == "Timecode":
return sub_element.value
except Exception as e:
logging.debug(f"Failed to parse cluster timestamp: {e}")
return 0
# GStreamer pipeline
# This is the original pipeline requested by the user to read from a multicast source.
pipeline = Gst.parse_launch(
f"udpsrc multicast-group={MULTICAST_GROUP} port={MULTICAST_PORT} caps=\"application/x-rtp, media=video, encoding-name=VP8, payload=96, clock-rate=90000\" ! "
"rtpjitterbuffer latency=200 ! "
"rtpvp8depay ! "
"webmmux streamable=true min-cluster-duration=1000000000 name=mux ! appsink name=sink emit-signals=true max-buffers=5 drop=true"
)
appsink = pipeline.get_by_name("sink")
def on_new_sample(sink):
global stream_buffer, first_cluster_received, chunk_index, pipeline_start_time
# Track when we first receive data
if pipeline_start_time is None:
pipeline_start_time = time.time()
logging.info(f"🎬 Pipeline started receiving data at {time.strftime('%H:%M:%S', time.localtime(pipeline_start_time))}")
sample = sink.emit("pull-sample")
if not sample:
return Gst.FlowReturn.OK
buf = sample.get_buffer()
success, mapinfo = buf.map(Gst.MapFlags.READ)
if not success:
return Gst.FlowReturn.OK
# Append new data to our persistent stream buffer
stream_buffer.extend(mapinfo.data)
buf.unmap(mapinfo)
if not first_cluster_received:
# Find the first cluster to write the init segment
header_end_pos = stream_buffer.find(CLUSTER_ID)
if header_end_pos != -1:
logging.info("First cluster found. Writing init segment.")
# Write the header
with open(INIT_FILE, "wb") as f:
f.write(stream_buffer[:header_end_pos])
# Remove the header data from our buffer
stream_buffer = stream_buffer[header_end_pos:]
first_cluster_received = True
# Fall through to process the rest of the buffer which may contain the first cluster
if first_cluster_received:
# Loop to extract all complete clusters from the buffer
while True:
# Search for the *next* cluster ID, starting after the first byte
# to ensure we find the beginning of the *next* cluster, not the current one.
next_cluster_pos = stream_buffer.find(CLUSTER_ID, 1)
if next_cluster_pos != -1:
# We found the start of the next cluster, so the current one is complete.
chunk_data = stream_buffer[:next_cluster_pos]
# Parse cluster properties
is_keyframe = parse_is_keyframe(chunk_data)
timestamp = parse_cluster_timestamp(chunk_data)
# Calculate time since pipeline started
elapsed_time = time.time() - pipeline_start_time if pipeline_start_time else 0
# Enhanced logging with keyframe detection and timing
if is_keyframe:
logging.info(f"🔑 Writing KEYFRAME chunk {chunk_index} ({len(chunk_data)} bytes) - "
f"webm_timestamp={timestamp}ms ({timestamp/1000:.3f}s) | "
f"pipeline_elapsed={elapsed_time:.3f}s")
else:
logging.info(f"Writing chunk {chunk_index} ({len(chunk_data)} bytes) - "
f"webm_timestamp={timestamp}ms ({timestamp/1000:.3f}s) | "
f"pipeline_elapsed={elapsed_time:.3f}s")
with open(os.path.join(CHUNK_DIR, f"{CHUNK_PREFIX}{chunk_index}.webm"), "wb") as f:
f.write(chunk_data)
chunk_index += 1
# Remove the written chunk from the buffer
stream_buffer = stream_buffer[next_cluster_pos:]
else:
# No more complete clusters in the buffer for now.
# Break the loop and wait for more data.
break
return Gst.FlowReturn.OK
appsink.connect("new-sample", on_new_sample)
# Start pipeline
pipeline.set_state(Gst.State.PLAYING)
# Run main loop
loop = GLib.MainLoop()
try:
logging.info(f"🚀 Pipeline running, waiting for multicast stream on {MULTICAST_GROUP}:{MULTICAST_PORT}...")
logging.info(f"📁 Output directory: {CHUNK_DIR}")
loop.run()
except KeyboardInterrupt:
print()
logging.info("Stopping pipeline...")
# --- Final Flush ---
# When stopping, any remaining data in the buffer is the last chunk.
if stream_buffer:
# Parse final chunk properties
is_keyframe = parse_is_keyframe(stream_buffer)
timestamp = parse_cluster_timestamp(stream_buffer)
if is_keyframe:
logging.info(f"🔑 Flushing final KEYFRAME chunk {chunk_index} ({len(stream_buffer)} bytes) - timestamp={timestamp}ms ({timestamp/1000:.3f}s)")
else:
logging.info(f"Flushing final chunk {chunk_index} ({len(stream_buffer)} bytes) - timestamp={timestamp}ms")
with open(os.path.join(CHUNK_DIR, f"{CHUNK_PREFIX}{chunk_index}.webm"), "wb") as f:
f.write(stream_buffer)
pipeline.set_state(Gst.State.NULL)