Recording stream from a webrtcsrc

Hi,

I’m trying to save a remote video stream thanks to a werbrtcsrc client:
gst-launch-1.0 webrtcsrc signaller::producer-peer-id=<peer_id> ! rtph264depay ! h264parse ! mp4mux ! filesink location=test.mp4

test.mp4 is however not properly written. Some metadata are missing I suppose

$ gst-discoverer-1.0 test.mp4
Analyzing file://test.mp4
Done discovering file://test.mp4
An error was encountered while discovering the file
This file contains no playable streams.

Adding a -e to the above command line to force the EOS doesn’t not help although it is caught.

Setting pipeline to PAUSED …
Pipeline is live and does not need PREROLL …
Pipeline is PREROLLED …
Setting pipeline to PLAYING …
New clock: GstSystemClock
Redistribute latency…
Redistribute latency…
Redistribute latency…
Redistribute latency…
Redistribute latency…
^Chandling interrupt.
Interrupt: Stopping pipeline …
EOS on shutdown enabled – Forcing EOS on the pipeline
Waiting for EOS…
0:00:15.1 / 99:99:99.
^C00:16.2 / 99:99:99.

If the server (webrtcsink) stops, this client ends properly and test.mp4 is valid.

Any thoughts about this?

I would also have expected -e to solve the problem. I don’t have a fix, but a workaround might be to use mpegts instead of mp4, since it does not require “finalization”; you can just stop whenever and the file will work. You could transmux to mp4 afterwards.

Thanks. I’ve also tried with mvk that also works in these conditions.

Actually my problem is that I have two video streams in the webrtcsrc. So I am doing the equivalent of this pipe in python to be able to record these two streams

import argparse

import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst


class GstRecorder:
    def __init__(self, signalling_host: str, signalling_port: int, peer_id: str, filename: str) -> None:
        Gst.init(None)

        self.pipeline = Gst.Pipeline.new("webRTC-recorder")
        source = Gst.ElementFactory.make("webrtcsrc")
        self.mux = Gst.ElementFactory.make("mp4mux")
        filesink = Gst.ElementFactory.make("filesink")
        filesink.set_property("location", filename)

        if not self.pipeline or not source or not filesink or not self.mux:
            print("Not all elements could be created.")
            exit(-1)

        # Set up the pipeline
        self.pipeline.add(source)
        self.pipeline.add(self.mux)
        self.pipeline.add(filesink)
        self.mux.link(filesink)

        source.connect("pad-added", self.webrtcsrc_pad_added_cb)
        signaller = source.get_property("signaller")
        signaller.set_property("producer-peer-id", peer_id)
        signaller.set_property("uri", f"ws://{signalling_host}:{signalling_port}")

    def webrtcsrc_pad_added_cb(self, webrtcsrc, pad) -> None:  # type: ignore[no-untyped-def]
        if pad.get_name().startswith("video"):
            receiver = Gst.ElementFactory.make("rtph264depay")
            h264parser = Gst.ElementFactory.make("h264parse")
            self.pipeline.add(receiver)
            self.pipeline.add(h264parser)
            pad.link(receiver.get_static_pad("sink"))
            receiver.link(h264parser)
            h264parser.link(self.mux)
            receiver.sync_state_with_parent()
            h264parser.sync_state_with_parent()
            self.mux.sync_state_with_parent()

    def __del__(self) -> None:
        Gst.deinit()

    def get_bus(self):  # type: ignore[no-untyped-def]
        return self.pipeline.get_bus()

    def record(self) -> None:
        # Start playing
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Error starting playback.")
            exit(-1)

    def stop(self) -> None:
        print("stopping")
        self.pipeline.send_event(Gst.Event.new_eos())
        self.pipeline.set_state(Gst.State.NULL)


def process_msg(bus) -> bool:  # type: ignore[no-untyped-def]
    msg = bus.timed_pop_filtered(10 * Gst.MSECOND, Gst.MessageType.ANY)
    if msg:
        if msg.type == Gst.MessageType.ERROR:
            err, debug = msg.parse_error()
            print(f"Error: {err}, {debug}")
            return False
        elif msg.type == Gst.MessageType.EOS:
            print("End-Of-Stream reached.")
            return False
        # else:
        #    print(f"Message: {msg.type}")
    return True


def main() -> None:
    parser = argparse.ArgumentParser(description="webrtc gstreamer simple recorder")
    parser.add_argument("--signaling-host", default="127.0.0.1", help="Gstreamer signaling host")
    parser.add_argument("--signaling-port", default=8443, help="Gstreamer signaling port")
    parser.add_argument(
        "--remote-producer-peer-id",
        type=str,
        help="producer peer_id",
        required=True,
    )
    parser.add_argument(
        "--output",
        type=str,
        help="mkv file",
        required=True,
    )

    args = parser.parse_args()

    recorder = GstRecorder(args.signaling_host, args.signaling_port, args.remote_producer_peer_id, args.output)
    recorder.record()

    # Wait until error or EOS
    bus = recorder.get_bus()  # type: ignore[no-untyped-call]
    try:
        while True:
            if not process_msg(bus):
                break

    except KeyboardInterrupt:
        print("User exit")
    finally:
        # Free resources
        recorder.stop()


if __name__ == "__main__":
    main()

In this context mkv doesn’t properly record the second track, and mpegts doesn’t work (with the right muxer is self.mux).
The EOS is only caught when I use mp4mux and if the connection is closed on the server side. Even forcing the EOS in the stop method doesn’t work.

I went looking at webrtcsrc code and, as you observed, it only propagates eos from the signaler’s ‘session-ended’ handler. That’s the reason why you get a usable mp4 file when you close the producer and not when you get gst-launch to push eos.

I propose the following workaround. The idea is to record individual streams as GStreamer Data Protocol payloads. You can then mux those in an mp4:

Set a pipeline with webrtcsrc configured as usual and connect the following ‘pad-added’ handler.

    def webrtcsrc_pad_added(webrtcsrc, pad):
        if pad.get_name().startswith('video'):
            videodepay = Gst.ElementFactory.make("rtph264depay")
            gdppay = Gst.ElementFactory.make("gdppay")
            filesink = Gst.ElementFactory.make("filesink")
            filesink.set_property("location", f"{pad.get_name()}.gdp")

            pipeline.add(videodepay, gdppay, filesink)
            Gst.Element.link_many(videodepay, gdppay, filesink)
            pad.link(videodepay.get_static_pad('sink'))

            videodepay.sync_state_with_parent()
            gdppay.sync_state_with_parent()
            filesink.sync_state_with_parent()

    webrtcsrc.connect('pad-added', webrtcsrc_pad_added)

This will record the video streams using their pad’s name, so with 2 video streams, you should get video_0.gdp & video_1.gdp.

Then you can mux them together:

gst-launch-1.0 \
    mp4mux name=mux ! filesink location=recording.mp4 \
    filesrc location=video_0.gdp ! gdpdepay ! h264parse ! queue ! mux. \
    filesrc location=video_1.gdp ! gdpdepay ! h264parse ! queue ! mux.

To play both streams, use:

gst-launch-1.0 \
    filesrc location=recording.mp4 ! qtdemux name=demux \
    demux.video_0 ! decodebin3 ! autovideosink \
    demux.video_1 ! decodebin3 ! autovideosink

Works well, thanks!
Just had to manually add and connect elements. Lines below raised an error

            pipeline.add(videodepay, gdppay, filesink)
            Gst.Element.link_many(videodepay, gdppay, filesink)

Interesting! :slight_smile: What error did you get?

Those

    self.pipeline.add(videodepay, gdppay, filesink)
TypeError: Gst.Bin.add() takes exactly 2 arguments (4 given)
    Gst.Element.link_many(videodepay, gdppay, filesink)
AttributeError: type object 'Element' has no attribute 'link_many'. Did you mean: 'link_pads'?

I didn’t even try to investigate :sweat_smile:

FYI my config:
Python 3.10.13 (conda env)
Gstreamer 1.22 (debian packages)
Debian 12

Hi @fengalin,

Using the same technique, I have some troubles with the audio stream. Sometimes it works but most of the time I have the following error:

    matroskamux name=mux ! filesink location=recording.mkv \
    filesrc location=video_0.gdp ! gdpdepay ! h264parse ! queue ! mux. \
    filesrc location=video_1.gdp ! gdpdepay ! h264parse ! queue ! mux. \
    filesrc location=audio_0.gdp ! gdpdepay ! opusparse ! queue ! mux.
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
Redistribute latency...
Redistribute latency...
Redistribute latency...
Redistribute latency...
Pipeline is PREROLLED ...
Setting pipeline to PLAYING ...
Redistribute latency...
New clock: GstSystemClock
ERROR: from element /GstPipeline:pipeline0/GstGDPDepay:gdpdepay2: Could not decode stream.
Additional debug info:
../gst/gdp/gstgdpdepay.c(518): gst_gdp_depay_chain (): /GstPipeline:pipeline0/GstGDPDepay:gdpdepay2:
could not create event from GDP packet
ERROR: from element /GstPipeline:pipeline0/GstFileSrc:filesrc2: Internal data stream error.
Execution ended after 0:00:00.002439507
Additional debug info:
../libs/gst/base/gstbasesrc.c(3132): gst_base_src_loop (): /GstPipeline:pipeline0/GstFileSrc:filesrc2:
streaming stopped, reason error (-5)
Setting pipeline to NULL ...
Freeing pipeline ...

The gdp file size looks right. I don’t know why the audio stream cannot be decoded. Is there a way to check the integrity of the streams?

FYI I’ve got this message with gst-discover

$ gst-discoverer-1.0 audio_0.gdp 
Analyzing file:///home/fabien/Dev/Python/gst-signalling-py/audio_0.gdp
Done discovering file:///home/fabien/Dev/Python/gst-signalling-py/audio_0.gdp
An error was encountered while discovering the file
 No valid frames decoded before end of stream

But also this for the video streams

$ gst-discoverer-1.0 video_0.gdp 
Analyzing file:///home/fabien/Dev/Python/gst-signalling-py/video_0.gdp
Done discovering file:///home/fabien/Dev/Python/gst-signalling-py/video_0.gdp
An error was encountered while discovering the file
 Could not determine type of stream.