Presentation timestamp from udpsrc

I am using multicast to stream a video only file over udp - i am wondering how the chunk timestamps are generated, they are obviously not the position in the file i’m streaming.if i start the client multiple times while the filestream is running - the timestamps on the receiving side always seems to start from 0 again which makes me think that they are related to the time the receiver pipleline was started?

➜  python git:(vp8) ✗ python3 receiver-file.py
2025-10-05 14:32:26,795 - INFO - 🚀 Pipeline running, waiting for multicast stream on 239.255.42.42:5000...
2025-10-05 14:32:26,796 - INFO - 📁 Output directory: chunks
2025-10-05 14:32:27,186 - INFO - 🎬 Pipeline started receiving data at 14:32:27
2025-10-05 14:32:27,413 - INFO - First cluster found. Writing init segment.
2025-10-05 14:32:28,905 - INFO - 🔑 Writing KEYFRAME chunk 1 (123706 bytes) - webm_timestamp=391ms (0.391s) | pipeline_elapsed=1.720s
2025-10-05 14:32:29,906 - INFO - 🔑 Writing KEYFRAME chunk 2 (106671 bytes) - webm_timestamp=1887ms (1.887s) | pipeline_elapsed=2.721s
2025-10-05 14:32:30,909 - INFO - 🔑 Writing KEYFRAME chunk 3 (108166 bytes) - webm_timestamp=2887ms (2.887s) | pipeline_elapsed=3.724s
2025-10-05 14:32:31,910 - INFO - 🔑 Writing KEYFRAME chunk 4 (109291 bytes) - webm_timestamp=3887ms (3.887s) | pipeline_elapsed=4.725s
2025-10-05 14:32:32,905 - INFO - 🔑 Writing KEYFRAME chunk 5 (116803 bytes) - webm_timestamp=4887ms (4.887s) | pipeline_elapsed=5.719s
2025-10-05 14:32:33,908 - INFO - 🔑 Writing KEYFRAME chunk 6 (105452 bytes) - webm_timestamp=5887ms (5.887s) | pipeline_elapsed=6.723s
^C
2025-10-05 14:32:34,462 - INFO - Stopping pipeline...
2025-10-05 14:32:34,462 - INFO - 🔑 Flushing final KEYFRAME chunk 7 (77832 bytes) - timestamp=6887ms (6.887s)

... stream is still running, restarting receiver... 
➜  python git:(vp8) ✗ python3 receiver-file.py
2025-10-05 14:32:35,742 - INFO - 🚀 Pipeline running, waiting for multicast stream on 239.255.42.42:5000...
2025-10-05 14:32:35,742 - INFO - 📁 Output directory: chunks
2025-10-05 14:32:36,189 - INFO - 🎬 Pipeline started receiving data at 14:32:36
2025-10-05 14:32:36,412 - INFO - First cluster found. Writing init segment.
2025-10-05 14:32:37,908 - INFO - 🔑 Writing KEYFRAME chunk 1 (162525 bytes) - webm_timestamp=444ms (0.444s) | pipeline_elapsed=1.718s
2025-10-05 14:32:38,910 - INFO - 🔑 Writing KEYFRAME chunk 2 (97975 bytes) - webm_timestamp=1940ms (1.940s) | pipeline_elapsed=2.720s
2025-10-05 14:32:39,907 - INFO - 🔑 Writing KEYFRAME chunk 3 (144099 bytes) - webm_timestamp=2940ms (2.940s) | pipeline_elapsed=3.718s
^C
2025-10-05 14:32:40,508 - INFO - Stopping pipeline...
2025-10-05 14:32:40,508 - INFO - 🔑 Flushing final KEYFRAME chunk 4 (89054 bytes) - timestamp=3940ms (3.940s)

import os
import sys
import io
import logging
import argparse
import time

# --- VENDORING SETUP ---
_APP_DIR = os.path.dirname(os.path.abspath(__file__))
_VENDOR_DIR = os.path.join(_APP_DIR, 'vendor')
if os.path.isdir(_VENDOR_DIR):
    sys.path.insert(0, _VENDOR_DIR)
else:
    print(f"Vendor directory not found at {_VENDOR_DIR}")

try:
    import gi
    import ebmlite
except ImportError as e:
    print(f"Missing dependency: {e}")
    sys.exit(1)

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

# Initialize GStreamer
Gst.init(None)

# Parse command line arguments
parser = argparse.ArgumentParser(description="WebM multicast receiver with file output")
parser.add_argument("--port", type=int, default=5000, help="Multicast port (default: 5000)")
parser.add_argument("--group", default="239.255.42.42", help="Multicast group (default: 239.255.42.42)")
parser.add_argument("--output-dir", default="chunks", help="Output directory for chunks (default: chunks)")
args = parser.parse_args()

# Use command line arguments
MULTICAST_GROUP = args.group
MULTICAST_PORT = args.port
CHUNK_DIR = args.output_dir
os.makedirs(CHUNK_DIR, exist_ok=True)

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load WebM schema
try:
    webm_schema = ebmlite.loadSchema("matroska.xml")
except Exception as e:
    logging.critical(f"Could not load matroska.xml schema: {e}")
    exit(1)

# Files and settings
INIT_FILE = f"{CHUNK_DIR}/init.webm"
CHUNK_PREFIX = "chunk_"

# --- State Variables ---
# Use a bytearray for efficient appending and slicing
stream_buffer = bytearray() 
first_cluster_received = False
chunk_index = 1
CLUSTER_ID = b"\x1F\x43\xB6\x75" # The EBML ID for a Cluster element
pipeline_start_time = None  # Track when pipeline starts receiving data


def get_vint_length(first_byte: int) -> int:
    """Get variable integer length from first byte."""
    if first_byte & 0x80: return 1
    if first_byte & 0x40: return 2
    if first_byte & 0x20: return 3
    if first_byte & 0x10: return 4
    return 1


def parse_is_keyframe(cluster_bytes: bytes) -> bool:
    """Detect if cluster contains keyframe using SimpleBlock flags."""
    try:
        cluster_stream = io.BytesIO(cluster_bytes)
        document = webm_schema.load(cluster_stream)
        for element in document:
            if element.name == "Cluster":
                for sub_element in element:
                    if sub_element.name == "SimpleBlock":
                        block_data = sub_element.value
                        if len(block_data) < 4:
                            continue
                        vint_len = get_vint_length(block_data[0])
                        flags_offset = vint_len + 2
                        if flags_offset < len(block_data):
                            flags_byte = block_data[flags_offset]
                            if flags_byte & 0x80:
                                return True
    except Exception as e:
        logging.error(f"EBML parse failed: {e}")
        return False
    return False


def parse_cluster_timestamp(cluster_bytes: bytes) -> int:
    """Extract timestamp from WebM cluster."""
    try:
        cluster_stream = io.BytesIO(cluster_bytes)
        document = webm_schema.load(cluster_stream)
        for element in document:
            if element.name == "Cluster":
                for sub_element in element:
                    if sub_element.name == "Timecode":
                        return sub_element.value
    except Exception as e:
        logging.debug(f"Failed to parse cluster timestamp: {e}")
    return 0

# GStreamer pipeline
# This is the original pipeline requested by the user to read from a multicast source.
pipeline = Gst.parse_launch(
    f"udpsrc multicast-group={MULTICAST_GROUP} port={MULTICAST_PORT} caps=\"application/x-rtp, media=video, encoding-name=VP8, payload=96, clock-rate=90000\" ! "
    "rtpjitterbuffer latency=200 ! "
    "rtpvp8depay ! "
    "webmmux streamable=true min-cluster-duration=1000000000 name=mux ! appsink name=sink emit-signals=true max-buffers=5 drop=true"
)

appsink = pipeline.get_by_name("sink")

def on_new_sample(sink):
    global stream_buffer, first_cluster_received, chunk_index, pipeline_start_time
    
    # Track when we first receive data
    if pipeline_start_time is None:
        pipeline_start_time = time.time()
        logging.info(f"🎬 Pipeline started receiving data at {time.strftime('%H:%M:%S', time.localtime(pipeline_start_time))}")
    
    sample = sink.emit("pull-sample")
    if not sample:
        return Gst.FlowReturn.OK
    
    buf = sample.get_buffer()
    success, mapinfo = buf.map(Gst.MapFlags.READ)
    if not success:
        return Gst.FlowReturn.OK

    # Append new data to our persistent stream buffer
    stream_buffer.extend(mapinfo.data)
    buf.unmap(mapinfo)

    if not first_cluster_received:
        # Find the first cluster to write the init segment
        header_end_pos = stream_buffer.find(CLUSTER_ID)
        if header_end_pos != -1:
            logging.info("First cluster found. Writing init segment.")
            # Write the header
            with open(INIT_FILE, "wb") as f:
                f.write(stream_buffer[:header_end_pos])
            
            # Remove the header data from our buffer
            stream_buffer = stream_buffer[header_end_pos:]
            first_cluster_received = True
            # Fall through to process the rest of the buffer which may contain the first cluster
    
    if first_cluster_received:
        # Loop to extract all complete clusters from the buffer
        while True:
            # Search for the *next* cluster ID, starting after the first byte
            # to ensure we find the beginning of the *next* cluster, not the current one.
            next_cluster_pos = stream_buffer.find(CLUSTER_ID, 1)

            if next_cluster_pos != -1:
                # We found the start of the next cluster, so the current one is complete.
                chunk_data = stream_buffer[:next_cluster_pos]
                
                # Parse cluster properties
                is_keyframe = parse_is_keyframe(chunk_data)
                timestamp = parse_cluster_timestamp(chunk_data)
                
                # Calculate time since pipeline started
                elapsed_time = time.time() - pipeline_start_time if pipeline_start_time else 0
                
                # Enhanced logging with keyframe detection and timing
                if is_keyframe:
                    logging.info(f"🔑 Writing KEYFRAME chunk {chunk_index} ({len(chunk_data)} bytes) - "
                               f"webm_timestamp={timestamp}ms ({timestamp/1000:.3f}s) | "
                               f"pipeline_elapsed={elapsed_time:.3f}s")
                else:
                    logging.info(f"Writing chunk {chunk_index} ({len(chunk_data)} bytes) - "
                               f"webm_timestamp={timestamp}ms ({timestamp/1000:.3f}s) | "
                               f"pipeline_elapsed={elapsed_time:.3f}s")
                
                with open(os.path.join(CHUNK_DIR, f"{CHUNK_PREFIX}{chunk_index}.webm"), "wb") as f:
                    f.write(chunk_data)
                
                chunk_index += 1
                
                # Remove the written chunk from the buffer
                stream_buffer = stream_buffer[next_cluster_pos:]
            else:
                # No more complete clusters in the buffer for now.
                # Break the loop and wait for more data.
                break

    return Gst.FlowReturn.OK

appsink.connect("new-sample", on_new_sample)

# Start pipeline
pipeline.set_state(Gst.State.PLAYING)

# Run main loop
loop = GLib.MainLoop()
try:
    logging.info(f"🚀 Pipeline running, waiting for multicast stream on {MULTICAST_GROUP}:{MULTICAST_PORT}...")
    logging.info(f"📁 Output directory: {CHUNK_DIR}")
    loop.run()
except KeyboardInterrupt:
    print()
    logging.info("Stopping pipeline...")
    # --- Final Flush ---
    # When stopping, any remaining data in the buffer is the last chunk.
    if stream_buffer:
        # Parse final chunk properties
        is_keyframe = parse_is_keyframe(stream_buffer)
        timestamp = parse_cluster_timestamp(stream_buffer)
        
        if is_keyframe:
            logging.info(f"🔑 Flushing final KEYFRAME chunk {chunk_index} ({len(stream_buffer)} bytes) - timestamp={timestamp}ms ({timestamp/1000:.3f}s)")
        else:
            logging.info(f"Flushing final chunk {chunk_index} ({len(stream_buffer)} bytes) - timestamp={timestamp}ms")
        
        with open(os.path.join(CHUNK_DIR, f"{CHUNK_PREFIX}{chunk_index}.webm"), "wb") as f:
            f.write(stream_buffer)
    
    pipeline.set_state(Gst.State.NULL)

Ok i guess I might not be the first beginner to get confused by this behaviour, its a little hard to navigatiate the docs - but RTFM :slight_smile: udpsrc is definitely a live source so i will have to get the original timestamps somewhere else, I think I read something about metadata…

Non-live sources timestamp buffers with a running-time starting from 0. After a flushing seek, they will produce buffers again from a running-time of 0.
Live sources need to timestamp buffers with a running-time matching the pipeline running-time when the first byte of the buffer was captured.

udpsrc simply set a timestamp base on arrival time. This is fed into either an RTP jitterbuffer or the new live MPEG TS source, and comes out as an approximation of the remote clock and stream time. Other protocols may require other methods. Note that in RTP, having an RTCP communication working greatly help in timestamps stability.

1 Like

Thanks, looking into it. So far I didn’t see the need for RTCP , but getting the original timestamps would help me sync clients…

@ndufresne - Thanks again for jour hint, but I’m having a hard time to understand it. After some tinkering I can get some stats from the jitterbuffer and send them via RTCP to the the multicast sender - but what now? What should the sender do with multiple receiver reports? i’ve seen that there are adaptions for a single unicast receiver, but i have many of them…

If you use rtpsrc/rtpsink element, these will create and configure the RTCP channel for you. For multicast its a bit complex, but the important part is that its the port + 1 that is being used as per standard.

thanks, i’ve checked the rtpsink element, but it just doesn’t seem to fit my usecase.

“its the port + 1 that is being used as per standard.”

No idea what you mean by that :slight_smile:
Basically what I would like to achieve is a way to see how the PTS of the file I’m streaming is matched by the different clients.
I guess that’s what this does, but i need to figure it out….

I have no idea if this is a good way to implement it but at least i got something working :slight_smile:
Along the way i ran into a couple of problems: After watching the !GREAT! video by @slomo Synchronised multi-room media playback and distributed live media processing and mixing with GStreamer - GStreamer conferences I thought at least I had a glimpse of how stuff was supposed to work - but since I’m on multicast I couldnt get rtbin to work properly (and the “batteries included” rtbin would have made everything a lot easier)
Tried to use “add-reference-timestamp-meta” on the jitterbuffer but couldnt read it on the receiver side in python, gave up.

probably this should be done in c anyway, but I’m not fluent.
what finally worked.

File VP8 Frame (PTS: file_time)
↓ matroskademux

  1. Raw VP8 (PTS: file_time)
    ↓ rtpvp8pay
  2. RTP Packet (timestamp: file_time * 90000/1e9)
    ↓ Network (multicast)
  3. RTP Packet (same timestamp)
    ↓ rtpvp8depay
  4. VP8 Frame (PTS: timestamp * 1e9/90000)
    ↓ Normalization
  5. VP8 Frame (PTS: pts - first_pts)
    ↓ webmmux
  6. WebM Chunk (relative timestamps)

Looks like this is way more complicated than i originally thought - there are a lot of timestamps involved and synchronization is a hard problem, @ndufresne “a bit more complex” is probably a litte bit of an euphimism :smiley: I’ll go do my homework…