Webrtcbin getting stuck on the first frame

Hello everyone! I’m developing a custom WebRTC system built upon Gstreamer’s webrtcbin element and gi library. The objective is to have 1 Linux machine stream a video to another Linux machine, where it is displayed. All of course inside a WebRTC framework.

The current state of the application is the following: signaling is being handled correctly, ICE candidates and SDP offer/answer are exchanged through a custom signalin server, therefore the stream is indeed routed from the first to the second machine, but here comes the issue:

The receiving pipeline only displays (autovideosink) a single frame, then it gets stuck. I figure that something is wrong with just the receiving pipeline, since wireshark shows that the packets coming from the sender do keep on flowing into the receiving machine even after the immediate freeze.

I here provide the code of the receiving script, as well as its logs and the still picture shown by autovideosink.

wrtc_receiver.py

import ssl
import websockets
import asyncio
import json
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp

class WebRTCReceiver:
    def __init__(self, server):
        self.server = server or 'ws://192.168.1.154:8080'
        self.conn = None
        self.pipe = None
        self.webrtc = None
        self.sender_id = None  # Initialize sender_id
        self.ice_candidates = []  # Store receiver's ICE candidates until SDP offer is received


    async def connect(self):
        print("Connecting to signaling server...")
        self.conn = await websockets.connect(self.server)
        print(f"Connected to signaling server at {self.server}")
        self.peer_id = input("Enter a unique peer ID for this receiver (e.g., 'receiver'): ")
        hello_msg = f"HELLO {self.peer_id}"
        print(f"Sending HELLO message: {hello_msg}")
        await self.conn.send(hello_msg)

    async def send_sdp_answer(self, answer):
        if self.sender_id:
            print(f"Sender ID: {self.sender_id}")
        else:
            print("Error: No sender ID found.")

        print("SDP Answer created:\n", answer)
        msg = json.dumps({'sdp': {'type': 'answer', 'sdp': answer}, 'to': self.sender_id})
        print(f"Sending SDP answer message:\n{msg}")
        await self.conn.send(msg)

    def handle_sdp_offer(self, offer_sdp):
        print("Handling SDP offer...")
        res, sdpmsg = GstSdp.SDPMessage.new()
        GstSdp.sdp_message_parse_buffer(bytes(offer_sdp.encode()), sdpmsg)
        offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
        promise = Gst.Promise.new()
        self.webrtc.emit('set-remote-description', offer, promise)
        promise.wait()
        print("SDP offer successfully set as remote description.")
        print("Creating SDP answer...")
        promise = Gst.Promise.new()
        self.webrtc.emit('create-answer', None, promise)
        promise.wait()
        reply = promise.get_reply()
        answer = reply.get_value('answer')
        promise = Gst.Promise.new()
        self.webrtc.emit('set-local-description', answer, promise)
        print("SDP answer successfully set as local description.")
        
        # Use await instead of asyncio.run() to avoid event loop conflicts
        asyncio.create_task(self.send_sdp_answer(answer.sdp.as_text()))

        # Now that the answer has been sent, send the locally collected ICE candidates
        for candidate in self.ice_candidates:
            self.send_ice_candidate(candidate['mlineindex'], candidate['candidate'], self.peer_id)
        self.ice_candidates.clear()  # Clear the list after sending candidates

    def send_ice_candidate(self, mlineindex, candidate):
        msg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'to': self.sender_id})
        print(f"Sending ICE candidate:\n{msg}")
        asyncio.run(self.conn.send(msg))

    def add_ice_candidate(self, candidate, mlineindex):
        print(f"Adding ICE candidate: {candidate}, mline index: {mlineindex}")
        self.webrtc.emit('add-ice-candidate', mlineindex, candidate)

    def on_ice_candidate(self, _, mlineindex, candidate):
        # Store ICE candidates until the SDP offer has been received and processed
        ret, state, pending = self.webrtc.get_state(Gst.CLOCK_TIME_NONE)
        if state != Gst.State.PLAYING:
            # Handle the case where the state is not playing
            print(f"Storing ICE candidate until SDP is processed: {candidate}")
            self.ice_candidates.append({'mlineindex': mlineindex, 'candidate': candidate})
        else:
            self.send_ice_candidate(mlineindex, candidate)

    def on_incoming_stream(self, _, pad):
        if pad.direction != Gst.PadDirection.SRC:
            return
        print("Incoming stream detected.")

        # Use a single decodebin for all incoming streams
        decodebin = self.pipe.get_by_name("decodebin")
        if not decodebin:
            decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
            self.pipe.add(decodebin)
            decodebin.sync_state_with_parent()
        
        decodebin.connect("pad-added", self.on_decodebin_pad_added)
        pad.link(decodebin.get_static_pad("sink"))

    def on_decodebin_pad_added(self, _, pad):
        if not pad.has_current_caps():
            return
        name = pad.get_current_caps().to_string()
        print(f"Decodebin pad added, caps: {name}")

        if name.startswith("video"):
            # Add a queue to handle buffering
            queue = Gst.ElementFactory.make("queue", "queue")
            convert = Gst.ElementFactory.make("videoconvert", "convert")
            sink = Gst.ElementFactory.make("autovideosink", "sink")
            
            # Set sync=false on the video sink
            sink.set_property("sync", False)

            # Add elements to the pipeline
            self.pipe.add(queue)
            self.pipe.add(convert)
            self.pipe.add(sink)

            # Link elements: pad -> queue -> convert -> sink
            queue.sync_state_with_parent()
            convert.sync_state_with_parent()
            sink.sync_state_with_parent()
            pad.link(queue.get_static_pad("sink"))
            queue.link(convert)
            convert.link(sink)

    def start_pipeline(self):
        print("Starting GStreamer pipeline...")
        self.pipe = Gst.parse_launch("webrtcbin name=recvrecv ! autovideosink")
        self.webrtc = self.pipe.get_by_name("recvrecv")
        if not self.webrtc:
            print("Error: 'recvrecv' webrtcbin element not found!")
            return
        self.webrtc.connect('on-ice-candidate', self.on_ice_candidate)  # Collect ICE candidates
        self.webrtc.connect("pad-added", self.on_incoming_stream)
        self.pipe.set_state(Gst.State.PLAYING)
        print("Pipeline started successfully.")

    async def handle_messages(self):
        async for message in self.conn:
            print(f"Received message from server: {message}")
            try:
                msg = json.loads(message)
                # Print out the raw message for better debugging
                print(f"Parsed message: {msg}")
                if msg.get("type") == "SESSION_REQUEST":
                    print(f"Session requested by {msg['from']}")
                    self.sender_id = msg["from"]  # Store sender_id
                elif "sdp" in msg:
                    if msg["sdp"]["type"] == "offer":
                        print("SDP Offer received, processing...")
                        self.handle_sdp_offer(msg["sdp"]["sdp"])
                elif "ice" in msg:
                    print(f"ICE candidate received: {msg['ice']['candidate']}")
                    self.add_ice_candidate(msg["ice"]["candidate"], msg["ice"]["sdpMLineIndex"])
            except json.JSONDecodeError:
                print("Error: Received non-JSON message. Raw message:", message)

    async def run(self):
        await self.connect()
        self.start_pipeline()
        await self.handle_messages()

def check_plugins():
    needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
            "rtpmanager", "videotestsrc", "audiotestsrc"]
    missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
    if len(missing):
        print('Missing gstreamer plugins:', missing)
        return False
    return True

if __name__ == '__main__':
    Gst.init(None)
    if not check_plugins():
        sys.exit(1)
    server = input("Enter the signaling server URL: ")
    receiver = WebRTCReceiver(server)
    asyncio.run(receiver.run())

Receiver logs (GST_DEBUG=3) (just the final logs). The logs following the “Decodebin pad added, caps” section pop up right when the frozen autovideosink window opens.

Incoming stream detected.
Decodebin pad added, caps: video/x-raw, format=(string)I420, width=(int)320, height=(int)240, interlace-mode=(string)progressive, multiview-mode=(string)mono, multiview-flags=(GstVideoMultiviewFlagsSet)0:ffffffff:/right-view-first/left-flipped/left-flopped/right-flipped/right-flopped/half-aspect/mixed-mono, pixel-aspect-ratio=(fraction)1/1, chroma-site=(string)jpeg, colorimetry=(string)bt601, framerate=(fraction)30/1
0:00:18.819687791  6830 0x7f00c000ecc0 WARN                 kmssink gstkmssink.c:835:gst_kms_sink_start:<sink-actual-sink-kms> error: Could not open DRM module (NULL)
0:00:18.819744246  6830 0x7f00c000ecc0 WARN                 kmssink gstkmssink.c:835:gst_kms_sink_start:<sink-actual-sink-kms> error: reason: No such file or directory (2)
0:00:18.819768850  6830 0x7f00c000ecc0 WARN                basesink gstbasesink.c:5367:gst_base_sink_change_state:<sink-actual-sink-kms> error: Failed to start

Here the situation I see. On the left, the sender machine displaying the test video correctly, the same it’s being sent; on the right, the still frame (glitched, sometimes is displayed with the correct color palette).

Does anyone have any tips on the matter?
Thanks!!

I am stuck on a similar problem.
I have 2 pipelines.
The first one is: filesrc → tsdemux → h264parse → rtph264pay → appsink.
the other one is: appsrc → identity (for debugging purposes) → webrtcbin

In the logs I see only one line from identity input:
4100000000 (0x7977ec002ce8): 80 60 4b 07 90 f6 c2 8a b9 f5 be a9 09 10 .`K…

and when I print the buffer size of appsrc (the element before identity) I see its only getting bigger (means its not flowing out of appsrc).

Before I split the pipeline into 2 (with appsink and appsrc) I had one pipeline (rtph264pay was linked with webrtcbin) and it worked fine.

Did you manage to solve this?