ok. i have created as simple a reproducer as i can. i have a single python app that acts as a webrtc client for 2 other webrtc clients in the browser.
when i run this and connect the second browser client, the video in the first browser window is frozen until the second browser client completes the webrtc connection.
for the browser web page, i’m using the example in the sendrecv/js
folder of the gstreamer repo with one minor change to connect to the signalling server at webrtc.gstreamer.net
diff --git a/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js b/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
index 83fdad31ec..f4f4f5e647 100644
--- a/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
+++ b/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
@@ -8,8 +8,8 @@
*/
// Set this to override the automatic detection in websocketServerConnect()
-var ws_server;
-var ws_port;
+var ws_server = "webrtc.gstreamer.net";
+var ws_port = 8443;
python code:
#!/usr/bin/env python3
import argparse
import asyncio
import json
import logging
import sys
import threading
import gi
import websockets
from websockets.version import version as wsv
gi.require_version("Gst", "1.0")
from gi.repository import Gst # NOQA
gi.require_version("GstWebRTC", "1.0")
from gi.repository import GstWebRTC # NOQA
gi.require_version("GstSdp", "1.0")
from gi.repository import GstSdp # NOQA
# Ensure that gst-python is installed
try:
from gi.overrides import Gst as _
except ImportError:
print("gstreamer-python binding overrides aren't available, please install them")
raise
GST_PIPELINE_TEMPLATE = """
videotestsrc is-live=true pattern=ball ! vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay pt=97 picture-id-mode=15-bit !
tee name=videotee ! queue ! fakesink sync=true
audiotestsrc is-live=true wave=silence ! audioconvert ! audio/x-raw,channels=1,rate=24000 ! audioresample
! opusenc audio-type=voice bitrate=28000 bandwidth=superwideband perfect-timestamp=true ! rtpopuspay pt=96
! application/x-rtp,encoding-name=OPUS !
tee name=audiotee ! queue ! fakesink sync=true
audiomixer name=audio-mixer force-live=true ! audioconvert !
audioresample ! audio/x-raw,rate=48000 ! fakesink sync=false
"""
def add_pad_to_element(element: Gst.Element, pad: Gst.Pad) -> None:
if not element.add_pad(pad):
logging.error("Unable to add gstreamer pad to element")
raise RuntimeError("A media error occurred")
def add_to_bin(gst_bin: Gst.Bin, element: Gst.Element) -> None:
rc = gst_bin.add(element)
if not rc and rc is not None:
logging.error(f"Unable to add gstreamer element to bin. rc={rc}")
raise RuntimeError("A media error occurred")
def sync_state_with_parent(element: Gst.Element) -> None:
if not element.sync_state_with_parent():
logging.error("Gstreamer element could not be synced with parent")
raise RuntimeError("A media error occurred")
def link_pads(srcpad: Gst.Pad, sinkpad: Gst.Pad) -> None:
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
logging.error("Unable to link gstreamer pads")
raise RuntimeError("A media error occurred")
def print_status(msg):
print(f"--- {msg}")
def print_error(msg):
print(f"!!! {msg}", file=sys.stderr)
class WebRTCPeer:
def __init__(
self,
device_id: str,
peer_id: str,
client,
channel_type: str,
channel_name: str,
webrtc_key: str,
pipeline: Gst.Pipeline,
audio_mixer,
turn_servers: list[str] | None,
disconnect_on_timeout: bool = False,
):
self.device_id = device_id
self._peer_id = peer_id
self.channel_type = channel_type
self.channel_name = channel_name
self.webrtc_key = webrtc_key
self._pipeline = pipeline
self._audio_mixer = audio_mixer
self.inference_enabled = False
self._client = client
self._peer_listener = None
# to be created in start()
self._webrtc = None # webrtcbin gst element
self._bin: Gst.Bin = None
self._audiotee: Gst.Element = None
self._videotee: Gst.Element = None
self._data_channel = None
self._data_channel_open = False
self.is_peer_connected = False
self.sdp_offer_sent = False
self._turn_servers = turn_servers
self._sdp_offer: str = None
self._ice_candidates = []
self._is_stopping: bool = False
self._disconnect_on_timeout = disconnect_on_timeout
@property
def peer_id(self) -> str:
return self._peer_id
@property
def turn_servers(self) -> list[str] | None:
return self._turn_servers
def on_offer_created(self, promise, _, __):
if promise.wait() != Gst.PromiseResult.REPLIED:
logging.warning("promise was not replied")
reply = promise.get_reply()
offer = reply.get_value("offer")
promise = Gst.Promise.new()
self._webrtc.emit("set-local-description", offer, promise)
promise.interrupt()
# self.send_sdp_offer(offer)
self.sdp_offer_sent = True
self._client.send_sdp(offer)
logging.info("end offer created")
def on_negotiation_needed(self, element):
logging.info("negotiation needed")
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit("create-offer", None, promise)
logging.info("create-offer created")
def on_ice_candidate(self, _, mlineindex, candidate):
logging.info(f"adding local ice {candidate}")
self._client.send_ice_candidate_message(_, mlineindex, candidate)
def on_incoming_stream(self, _, pad: Gst.Pad):
if pad.direction != Gst.PadDirection.SRC:
logging.info(f"ignoring pad {pad.get_name()}")
return
logging.info(f"incoming stream setup for pad: {pad}")
caps = pad.get_current_caps()
s = caps.get_structure(0)
logging.info(f"caps: {caps.to_string()}. s: {s}")
media = s.get_string("media")
enc = s.get_string("encoding-name")
if media.startswith("audio"):
logging.info("configuring incoming audio stream...")
# need an audio converter for incoming audio streams that will decode the binary
# stream and connect it to the audio mixer for audio output to the device
if enc == "OPUS":
logging.info("using opus decoder")
audio_conv_bin: Gst.Bin = Gst.parse_bin_from_description(
"rtpopusdepay name=dbin ! queue ! opusdec ! audioresample name=src",
False,
)
else:
audio_conv_bin: Gst.Bin = Gst.parse_bin_from_description(
"decodebin name=dbin ! queue ! audioconvert ! audioresample name=src sinc-filter-mode=1",
False,
)
# Add a ghost pad on our conv bin that proxies the sink pad of the decodebin
dbin = audio_conv_bin.get_by_name("dbin")
sinkpad = Gst.GhostPad.new("sink", dbin.get_static_pad("sink"))
add_pad_to_element(audio_conv_bin, sinkpad)
# And another one that proxies the source pad of the last element
src = audio_conv_bin.get_by_name("src")
srcpad = Gst.GhostPad.new("src", src.get_static_pad("src"))
add_pad_to_element(audio_conv_bin, srcpad)
# add to our peer bin
add_to_bin(self._bin, audio_conv_bin)
# sync the playing state with the parent peer bin
sync_state_with_parent(audio_conv_bin)
# connect incoming audio pad to our decoder bin
link_pads(pad, sinkpad)
# add ghost output pad to peer bin
audio_src_pad = Gst.GhostPad.new("audio_src", srcpad)
audio_src_pad.set_active(True)
add_pad_to_element(
self._bin, audio_src_pad
) # should trigger our on_peer_bin_added function
else:
logging.error(f"Ignoring stream {s.to_string()} with media type {media}")
def on_peer_bin_pad_added(self, _, pad):
logging.info(f"peer bin pad added: {pad.get_name()}")
# only receive audio streams directly
if pad.get_name() == "audio_src":
logging.info("got audio_src")
audiomixer_sink_pad = self._audio_mixer.get_request_pad("sink_%u")
link_pads(pad, audiomixer_sink_pad)
# listen for unlink event in order to cleanup
audiomixer_sink_pad.connect("unlinked", self.on_audio_pad_unlinked)
self._pipeline.recalculate_latency()
else:
logging.warning(f"ignoring peer bin pad {pad.get_name()}")
def on_audio_pad_unlinked(self, pad, peer):
logging.info("audio pad unlinked")
audiomixer = pad.get_parent()
audiomixer.release_request_pad(pad)
# audiomixer.unref() # decrement ref count from get_parent per docs
logging.info("audio pad released from mixer")
def on_pad_probe(self, pad, info):
logging.info(f"pad probe: {pad.get_name()}; {str(info)}")
return Gst.PadProbeReturn.OK
def on_connection_state_change(self, _bin, prop) -> None:
value = self._webrtc.get_property(prop.name)
logging.info(f"peer {self._peer_id} connection state changed to {value}")
if value == GstWebRTC.WebRTCPeerConnectionState.CONNECTED:
# recalculate latency after peer connection is established. we get warnings from rtpsession if we don't
webrtc_latency = self._webrtc.get_property("latency") # in ms
self._audio_mixer.set_property(
"min-upstream-latency", webrtc_latency * 1000000
) # set value in ns
rc = self._pipeline.recalculate_latency()
if not rc:
logging.warning("failed to recalculate latency")
self.is_peer_connected = True
logging.info(f"peer {self._peer_id} connected successfully")
self._client.switch_conn()
logging.info("stopped ws conn")
else:
self.is_peer_connected = False
def on_ice_connection_state_change(self, _bin, prop) -> None:
value = self._webrtc.get_property(prop.name)
logging.info(f"peer {self._peer_id} ice state changed to {value}")
def on_signaling_state_change(self, _bin, prop) -> None:
value = self._webrtc.get_property(prop.name)
logging.info(f"peer {self._peer_id} signaling state changed to {value}")
def handle_sdp_answer(self, sdp_dict: dict):
sdp = sdp_dict["sdp"]
logging.info("Received answer:\n%s" % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(
GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
)
logging.info("sending set-remote-descr")
promise = Gst.Promise.new()
self._webrtc.emit("set-remote-description", answer, promise)
promise.interrupt()
logging.info("set-remote-descr sent")
def handle_ice(self, ice_dict: dict):
candidate = ice_dict["candidate"]
if candidate:
sdpmlineindex = ice_dict["sdpMLineIndex"]
self._webrtc.emit("add-ice-candidate", sdpmlineindex, candidate)
logging.info(f"added remote ice candidate {sdpmlineindex} {str(candidate)}")
else:
logging.error(f"unexpected ice payload: {str(ice_dict)}")
def _handle_disconnection(self, error: str | None) -> None:
if self._peer_listener:
# need to start from a new thread b/c callback is async
t = threading.Thread(
target=self._peer_listener.on_peer_disconnected,
args=(self, error),
name="webrtc-peer-disconnect",
)
t.start()
def _on_data_channel_error(self, channel, error) -> None:
logging.error(f"peer {self._peer_id} data channel error: {error}")
def start(self) -> None:
logging.info(f"starting peer {self._peer_id}")
# based on https://gitlab.freedesktop.org/gstreamer/gst-examples/-/blob/1.18/webrtc/multiparty-sendrecv/gst-rust/src/main.rs # noqa: E501
peer_bin = Gst.parse_bin_from_description(
"queue name=video-queue ! webrtcbin. queue name=audio-queue ! webrtcbin. webrtcbin name=webrtcbin",
False,
)
self._bin = peer_bin
webrtc = peer_bin.get_by_name("webrtcbin")
webrtc.set_property("stun-server", "stun://stun.l.google.com:19302")
webrtc.set_property("bundle-policy", "max-bundle")
if self._turn_servers:
for turn_url in self._turn_servers:
# The uri of the server of the form turn(s)://username:password@host:port
webrtc.emit("add-turn-server", turn_url)
self._webrtc = webrtc
audio_queue = peer_bin.get_by_name("audio-queue")
audio_sink_pad = Gst.GhostPad.new(
"audio_sink", audio_queue.get_static_pad("sink")
)
# peer_bin.add_pad(audio_sink_pad)
add_pad_to_element(peer_bin, audio_sink_pad)
video_queue = peer_bin.get_by_name("video-queue")
video_sink_pad = Gst.GhostPad.new(
"video_sink", video_queue.get_static_pad("sink")
)
# peer_bin.add_pad(video_sink_pad)
add_pad_to_element(peer_bin, video_sink_pad)
# self._pipeline.add(peer_bin)
add_to_bin(self._pipeline, peer_bin)
webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
webrtc.connect("on-ice-candidate", self.on_ice_candidate)
webrtc.connect("pad-added", self.on_incoming_stream)
webrtc.connect(
"notify::ice-connection-state", self.on_ice_connection_state_change
)
webrtc.connect("notify::connection-state", self.on_connection_state_change)
webrtc.connect("notify::signaling-state", self.on_signaling_state_change)
# need to create a data channel to coordinate mesh connections for user-to-user connections
webrtc.set_state(Gst.State.READY)
data_channel = webrtc.emit("create-data-channel", "signal-data", None)
if data_channel:
self._data_channel = data_channel
logging.info("data channel created")
data_channel.connect("on-error", self._on_data_channel_error)
else:
logging.error("data channel not created")
peer_bin.connect("pad-added", self.on_peer_bin_pad_added)
audiotee = self._pipeline.get_by_name("audiotee")
self._audiotee = audiotee
audio_src_pad = audiotee.get_request_pad("src_%u")
audio_block = audio_src_pad.add_probe(
Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
)
# ret = audio_src_pad.link(audio_sink_pad)
link_pads(audio_src_pad, audio_sink_pad)
videotee = self._pipeline.get_by_name("videotee")
self._videotee = videotee
video_src_pad = videotee.get_request_pad("src_%u")
video_block = video_src_pad.add_probe(
Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
)
# ret = video_src_pad.link(video_sink_pad)
link_pads(video_src_pad, video_sink_pad)
# declare that we will only send (not receive) video
trans = webrtc.emit("get-transceiver", 0)
trans.set_property(
"direction", GstWebRTC.WebRTCRTPTransceiverDirection.SENDONLY
)
logging.info("set sendonly")
# closure to unblock pads
def set_bin_playing(b):
# rc = b.sync_state_with_parent()
sync_state_with_parent(b)
audio_src_pad.remove_probe(audio_block)
video_src_pad.remove_probe(video_block)
logging.info("peer block removed")
peer_bin.call_async(set_bin_playing)
logging.info("add peer complete")
def stop(self) -> None:
self._is_stopping = True
logging.info(f"stopping peer {self._peer_id} on channel {self.channel_name}")
if self._data_channel:
try:
self._data_channel.emit("close")
logging.info("data channel closed")
except Exception:
logging.exception("error closing data channel")
try:
# Block the tees shortly for removal
audiotee_sinkpad = self._audiotee.get_static_pad("sink")
if audiotee_sinkpad:
audio_block = audiotee_sinkpad.add_probe(
Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
)
else:
logging.warning("unable to get audiotee sink pad")
audio_block = None
videotee_sinkpad = self._videotee.get_static_pad("sink")
if videotee_sinkpad:
video_block = videotee_sinkpad.add_probe(
Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
)
else:
logging.warning("unable to get videotee sink pad")
video_block = None
# Release the tee pads and unblock
audio_sinkpad = self._bin.get_static_pad("audio_sink")
video_sinkpad = self._bin.get_static_pad("video_sink")
if audio_sinkpad:
audiotee_srcpad = audio_sinkpad.get_peer()
if audiotee_srcpad:
audiotee_srcpad.unlink(audio_sinkpad)
self._audiotee.release_request_pad(audiotee_srcpad)
else:
logging.warning("unable to get audiotee srcpad")
else:
logging.warning("unable to get audio sinkpad")
if audiotee_sinkpad and audio_block:
audiotee_sinkpad.remove_probe(audio_block)
logging.info("removed audio block probe")
if video_sinkpad:
videotee_srcpad = video_sinkpad.get_peer()
if videotee_srcpad:
videotee_srcpad.unlink(video_sinkpad)
self._videotee.release_request_pad(videotee_srcpad)
else:
logging.error("unable to get videotee srcpad")
if videotee_sinkpad and video_block:
videotee_sinkpad.remove_probe(video_block)
logging.info("removed video block probe")
except Exception:
logging.exception("Exeption while stopping peer")
# Then remove the peer bin gracefully from the pipeline
self._pipeline.remove(self._bin)
self._bin.set_state(Gst.State.NULL)
# release references
self._pipeline = None
self._audio_mixer = None
self._webrtc = None
self._bin = None
self._audiotee = None
self._videotee = None
self._pubsub_client = None
self._sdp_offer = None
self._ice_candidates = None
self._data_channel = None
self._peer_listener = None
self._turn_servers = None
logging.info("peer removed")
class WebRTCClient:
def __init__(
self,
loop,
our_id,
peer_id,
server,
remote_is_offerer,
video_encoding,
source_type,
):
self.current_conn = None
self.pipe = None
self.webrtc_peers = []
self.event_loop = loop
self.server = server
# An optional user-specified ID we can use to register
self.our_id = our_id
# The actual ID we used to register
self.id_ = None
# An optional peer ID we should connect to
self.peer_id = peer_id
# Whether we will send the offer or the remote peer will
self.remote_is_offerer = remote_is_offerer
async def send(self, conn, msg):
assert conn
print(f">>> {msg}")
await conn.send(msg)
def switch_conn(self):
self.current_conn = self.conn2
print("switched to conn2")
def start_conn_thread(self, conn_name):
t = threading.Thread(
target=self.start_loop, args=(conn_name,), name=f"conn-thread-{conn_name}"
)
t.start()
def start_loop(self, conn_name):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if conn_name == "5001":
self.conn1 = loop.run_until_complete(websockets.connect(self.server))
conn = self.conn1
self.current_conn = conn
else:
self.conn2 = loop.run_until_complete(websockets.connect(self.server))
conn = self.conn2
loop.run_until_complete(self.send(conn, f"HELLO {conn_name}"))
loop.run_until_complete(self.loop(conn, conn_name))
async def setup_call(self, conn, peer_id):
assert peer_id
await self.send(conn, f"SESSION {peer_id}")
def send_soon(self, msg):
asyncio.run_coroutine_threadsafe(
self.send(self.current_conn, msg), self.event_loop
)
def on_bus_poll_cb(self, bus):
def remove_bus_poll():
self.event_loop.remove_reader(bus.get_pollfd().fd)
self.event_loop.stop()
while bus.peek():
msg = bus.pop()
if msg.type == Gst.MessageType.ERROR:
err = msg.parse_error()
print("ERROR:", err.gerror, err.debug)
remove_bus_poll()
break
elif msg.type == Gst.MessageType.EOS:
remove_bus_poll()
break
elif msg.type == Gst.MessageType.LATENCY:
self.pipe.recalculate_latency()
def send_sdp(self, offer):
text = offer.sdp.as_text()
if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
print_status("Sending offer:\n%s" % text)
msg = json.dumps({"sdp": {"type": "offer", "sdp": text}})
elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
print_status("Sending answer:\n%s" % text)
msg = json.dumps({"sdp": {"type": "answer", "sdp": text}})
else:
raise AssertionError(offer.type)
self.send_soon(msg)
def send_ice_candidate_message(self, _, mlineindex, candidate):
icemsg = json.dumps(
{"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}}
)
self.send_soon(icemsg)
def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
if self.pipe:
return
print_status(f"Creating pipeline, create_offer: {create_offer}")
desc = GST_PIPELINE_TEMPLATE
self.pipe = Gst.parse_launch(desc)
bus = self.pipe.get_bus()
self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
self.pipe.set_state(Gst.State.PLAYING)
def handle_json(self, message):
logging.info(f"incoming json message: {message}")
try:
msg = json.loads(message)
except json.decoder.JSONDecodeError:
print_error("Failed to parse JSON message, this might be a bug")
raise
if "sdp" in msg:
sdp = msg["sdp"]["sdp"]
if msg["sdp"]["type"] == "answer":
print_status("Received answer:\n%s" % sdp)
res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
answer = GstWebRTC.WebRTCSessionDescription.new(
GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
)
promise = Gst.Promise.new()
# just use the last peer
webrtc = self.webrtc_peers[-1]._webrtc
webrtc.emit("set-remote-description", answer, promise)
promise.interrupt() # we don't care about the result, discard it
else:
print_status("Received offer:\n%s" % sdp)
elif "ice" in msg:
assert self.webrtc_peers[-1]._webrtc
ice = msg["ice"]
candidate = ice["candidate"]
sdpmlineindex = ice["sdpMLineIndex"]
webrtc = self.webrtc_peers[-1]._webrtc
webrtc.emit("add-ice-candidate", sdpmlineindex, candidate)
print("added remote ice candidate")
else:
print_error("Unknown JSON message")
def close_pipeline(self):
if self.pipe:
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
self.webrtc = None
def add_peer(self):
peer_id = len(self.webrtc_peers) + 1000 + 1
audio_mixer = self.pipe.get_by_name("audio-mixer")
peer = WebRTCPeer(
"foo",
peer_id,
self,
"test",
"test",
"test-key",
self.pipe,
audio_mixer,
None,
)
peer.start()
self.webrtc_peers.append(peer)
async def loop(self, conn, conn_name):
print("starting loop")
assert conn
async for message in conn:
print(f"<<< {message}")
if message == "HELLO":
print_status("waiting for for offer request")
elif message == "SESSION_OK":
self.start_pipeline()
elif message == "OFFER_REQUEST":
print_status("Incoming call: we have been asked to create the offer")
self.start_pipeline()
self.add_peer()
elif message.startswith("ERROR"):
print_error(message)
self.close_pipeline()
return 1
else:
self.handle_json(message)
# self.close_pipeline()
print("loop finished")
return 0
async def stop(self):
if self.conn:
await self.conn.close()
self.conn = None
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(levelname)s [%(threadName)s] %(message)s"
)
Gst.init(None)
parser = argparse.ArgumentParser()
parser.add_argument(
"--our-id",
default="5001",
help="String ID that the peer can use to connect to us",
)
parser.add_argument(
"--server",
default="wss://webrtc.gstreamer.net:8443",
help='Signalling server to connect to, eg "wss://127.0.0.1:8443"',
)
args = parser.parse_args()
loop = asyncio.new_event_loop()
c = WebRTCClient(
loop,
args.our_id,
None,
args.server,
True,
"vp8",
"test"
)
c.start_conn_thread("5001")
c.start_conn_thread("5002")
loop.run_forever()
sys.exit(0)
Here are my repro steps:
- Start python program
- Open the
index.html
from the gstreamer repo locally using a file url (e.g. file:///Users/james/dev/gstreamer/subprojects/gst-examples/webrtc/sendrecv/js/index.html). This was tested in Chrome incognito mode
- Enter
5001
into the peer id box. Click the Remote offerer
checkbox then click Connect
- You should be present with a dialog from the browser to allow access to the microphone/camera, click allow. At this point you should see the test video of a bouncing ball.
- Open a new browser window and go to the same file url as described above
- Enter
5002
into the peer id box. Click the Remote offerer
checkbox then click Connect
- Before clicking allow to enable access to your mic/camera, notice the video in the first window is now frozen and will not resume until you click allow in the second browser window.
If you have any problems let me know. Thanks so much for your help.