Hello everyone! I’m developing a custom WebRTC system built upon Gstreamer’s webrtcbin element and gi library. The objective is to have 1 Linux machine stream a video to another Linux machine, where it is displayed. All of course inside a WebRTC framework.
The current state of the application is the following: signaling is being handled correctly, ICE candidates and SDP offer/answer are exchanged through a custom signalin server, therefore the stream is indeed routed from the first to the second machine, but here comes the issue:
The receiving pipeline only displays (autovideosink) a single frame, then it gets stuck. I figure that something is wrong with just the receiving pipeline, since wireshark shows that the packets coming from the sender do keep on flowing into the receiving machine even after the immediate freeze.
I here provide the code of the receiving script, as well as its logs and the still picture shown by autovideosink.
wrtc_receiver.py
import ssl
import websockets
import asyncio
import json
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
class WebRTCReceiver:
def __init__(self, server):
self.server = server or 'ws://192.168.1.154:8080'
self.conn = None
self.pipe = None
self.webrtc = None
self.sender_id = None # Initialize sender_id
self.ice_candidates = [] # Store receiver's ICE candidates until SDP offer is received
async def connect(self):
print("Connecting to signaling server...")
self.conn = await websockets.connect(self.server)
print(f"Connected to signaling server at {self.server}")
self.peer_id = input("Enter a unique peer ID for this receiver (e.g., 'receiver'): ")
hello_msg = f"HELLO {self.peer_id}"
print(f"Sending HELLO message: {hello_msg}")
await self.conn.send(hello_msg)
async def send_sdp_answer(self, answer):
if self.sender_id:
print(f"Sender ID: {self.sender_id}")
else:
print("Error: No sender ID found.")
print("SDP Answer created:\n", answer)
msg = json.dumps({'sdp': {'type': 'answer', 'sdp': answer}, 'to': self.sender_id})
print(f"Sending SDP answer message:\n{msg}")
await self.conn.send(msg)
def handle_sdp_offer(self, offer_sdp):
print("Handling SDP offer...")
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(offer_sdp.encode()), sdpmsg)
offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', offer, promise)
promise.wait()
print("SDP offer successfully set as remote description.")
print("Creating SDP answer...")
promise = Gst.Promise.new()
self.webrtc.emit('create-answer', None, promise)
promise.wait()
reply = promise.get_reply()
answer = reply.get_value('answer')
promise = Gst.Promise.new()
self.webrtc.emit('set-local-description', answer, promise)
print("SDP answer successfully set as local description.")
# Use await instead of asyncio.run() to avoid event loop conflicts
asyncio.create_task(self.send_sdp_answer(answer.sdp.as_text()))
# Now that the answer has been sent, send the locally collected ICE candidates
for candidate in self.ice_candidates:
self.send_ice_candidate(candidate['mlineindex'], candidate['candidate'], self.peer_id)
self.ice_candidates.clear() # Clear the list after sending candidates
def send_ice_candidate(self, mlineindex, candidate):
msg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'to': self.sender_id})
print(f"Sending ICE candidate:\n{msg}")
asyncio.run(self.conn.send(msg))
def add_ice_candidate(self, candidate, mlineindex):
print(f"Adding ICE candidate: {candidate}, mline index: {mlineindex}")
self.webrtc.emit('add-ice-candidate', mlineindex, candidate)
def on_ice_candidate(self, _, mlineindex, candidate):
# Store ICE candidates until the SDP offer has been received and processed
ret, state, pending = self.webrtc.get_state(Gst.CLOCK_TIME_NONE)
if state != Gst.State.PLAYING:
# Handle the case where the state is not playing
print(f"Storing ICE candidate until SDP is processed: {candidate}")
self.ice_candidates.append({'mlineindex': mlineindex, 'candidate': candidate})
else:
self.send_ice_candidate(mlineindex, candidate)
def on_incoming_stream(self, _, pad):
if pad.direction != Gst.PadDirection.SRC:
return
print("Incoming stream detected.")
# Use a single decodebin for all incoming streams
decodebin = self.pipe.get_by_name("decodebin")
if not decodebin:
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
self.pipe.add(decodebin)
decodebin.sync_state_with_parent()
decodebin.connect("pad-added", self.on_decodebin_pad_added)
pad.link(decodebin.get_static_pad("sink"))
def on_decodebin_pad_added(self, _, pad):
if not pad.has_current_caps():
return
name = pad.get_current_caps().to_string()
print(f"Decodebin pad added, caps: {name}")
if name.startswith("video"):
# Add a queue to handle buffering
queue = Gst.ElementFactory.make("queue", "queue")
convert = Gst.ElementFactory.make("videoconvert", "convert")
sink = Gst.ElementFactory.make("autovideosink", "sink")
# Set sync=false on the video sink
sink.set_property("sync", False)
# Add elements to the pipeline
self.pipe.add(queue)
self.pipe.add(convert)
self.pipe.add(sink)
# Link elements: pad -> queue -> convert -> sink
queue.sync_state_with_parent()
convert.sync_state_with_parent()
sink.sync_state_with_parent()
pad.link(queue.get_static_pad("sink"))
queue.link(convert)
convert.link(sink)
def start_pipeline(self):
print("Starting GStreamer pipeline...")
self.pipe = Gst.parse_launch("webrtcbin name=recvrecv ! autovideosink")
self.webrtc = self.pipe.get_by_name("recvrecv")
if not self.webrtc:
print("Error: 'recvrecv' webrtcbin element not found!")
return
self.webrtc.connect('on-ice-candidate', self.on_ice_candidate) # Collect ICE candidates
self.webrtc.connect("pad-added", self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)
print("Pipeline started successfully.")
async def handle_messages(self):
async for message in self.conn:
print(f"Received message from server: {message}")
try:
msg = json.loads(message)
# Print out the raw message for better debugging
print(f"Parsed message: {msg}")
if msg.get("type") == "SESSION_REQUEST":
print(f"Session requested by {msg['from']}")
self.sender_id = msg["from"] # Store sender_id
elif "sdp" in msg:
if msg["sdp"]["type"] == "offer":
print("SDP Offer received, processing...")
self.handle_sdp_offer(msg["sdp"]["sdp"])
elif "ice" in msg:
print(f"ICE candidate received: {msg['ice']['candidate']}")
self.add_ice_candidate(msg["ice"]["candidate"], msg["ice"]["sdpMLineIndex"])
except json.JSONDecodeError:
print("Error: Received non-JSON message. Raw message:", message)
async def run(self):
await self.connect()
self.start_pipeline()
await self.handle_messages()
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True
if __name__ == '__main__':
Gst.init(None)
if not check_plugins():
sys.exit(1)
server = input("Enter the signaling server URL: ")
receiver = WebRTCReceiver(server)
asyncio.run(receiver.run())
Receiver logs (GST_DEBUG=3) (just the final logs). The logs following the “Decodebin pad added, caps” section pop up right when the frozen autovideosink window opens.
Incoming stream detected.
Decodebin pad added, caps: video/x-raw, format=(string)I420, width=(int)320, height=(int)240, interlace-mode=(string)progressive, multiview-mode=(string)mono, multiview-flags=(GstVideoMultiviewFlagsSet)0:ffffffff:/right-view-first/left-flipped/left-flopped/right-flipped/right-flopped/half-aspect/mixed-mono, pixel-aspect-ratio=(fraction)1/1, chroma-site=(string)jpeg, colorimetry=(string)bt601, framerate=(fraction)30/1
0:00:18.819687791 6830 0x7f00c000ecc0 WARN kmssink gstkmssink.c:835:gst_kms_sink_start:<sink-actual-sink-kms> error: Could not open DRM module (NULL)
0:00:18.819744246 6830 0x7f00c000ecc0 WARN kmssink gstkmssink.c:835:gst_kms_sink_start:<sink-actual-sink-kms> error: reason: No such file or directory (2)
0:00:18.819768850 6830 0x7f00c000ecc0 WARN basesink gstbasesink.c:5367:gst_base_sink_change_state:<sink-actual-sink-kms> error: Failed to start
Here the situation I see. On the left, the sender machine displaying the test video correctly, the same it’s being sent; on the right, the still frame (glitched, sometimes is displayed with the correct color palette).
Does anyone have any tips on the matter?
Thanks!!