Webrtcbin video is frozen until new peer is connected

I have a multi party webrtc python application modeled as closely as possible to the multipary-sendrecv rust example

I’m using gstreamer 1.22.0 on Debian ARM

This works great overall but I’m seeing that as soon as add a new webrtcbin element to the pipeline, the video freezes for all users that are already connected to this pipeline until the peer for that new webrtcbin element is connected. In practice, this hasn’t been noticeable because the new peer connects within a second. The problem is noticeable however if there is a delay in that peer connecting or something happens to the peer before the connection can be completed.

Does anyone have any insight on why the pipeline is blocked for existing peers when adding a new webrtcbin to the pipeline? Any suggestions on how to address this problem?

Thanks

My impression is that the pipeline went back to pause in order to reobtain synchronization between sinks. Differently said, it’s prerolling again in presence of a new sink.

If synchronization of peers is not something you seek for, you can disable it with the async property on audio/video sinks.

That makes sense and for this particular case it’s ok if the peers are not perfectly synchronized. I’m not clear on what sink elements I would set that for however. Here’s my pipeline:

v4l2src device=/dev/video22 !
video/x-raw,width=1280,height=720,format=NV12,framerate=30/1
  ! vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay pt=97 picture-id-mode=15-bit
tee name=videotee ! queue ! fakesink sync=true
alsasrc ! audioconvert ! audio/x-raw,channels=1,rate=24000 ! audioresample
  ! opusenc audio-type=voice bitrate=28000 bandwidth=superwideband perfect-timestamp=true ! rtpopuspay pt=96
  ! application/x-rtp,encoding-name=OPUS !
tee name=audiotee ! queue ! fakesink sync=true
audiomixer name=audio-mixer force-live=true ! audioconvert !
audioresample ! audio/x-raw,rate=48000 ! alsasink sync=false

As each peer connects, I create this bin

queue name=video-queue ! webrtcbin. \
             queue name=audio-queue ! webrtcbin. \
             webrtcbin name=webrtcbin

and then insert it into the pipeline.

This is for a video monitoring application where multiple peers can view the video. The audio is two-way.

The “playback” sinks, from quick read 2 fakesink and an alsasink, add async=false. And if you add a branch to your tee, that branch sink will need the same. Note that I thought you would have multiple webtrc peer, so it behave differently then I thought.

Thanks but unfortunately setting async=false didn’t help.

Maybe you should attach a small reproducer program, we’d be able to see exactly what you are doing and what your specific issue may be ?

ok. i have created as simple a reproducer as i can. i have a single python app that acts as a webrtc client for 2 other webrtc clients in the browser.

when i run this and connect the second browser client, the video in the first browser window is frozen until the second browser client completes the webrtc connection.

for the browser web page, i’m using the example in the sendrecv/js folder of the gstreamer repo with one minor change to connect to the signalling server at webrtc.gstreamer.net

diff --git a/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js b/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
index 83fdad31ec..f4f4f5e647 100644
--- a/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
+++ b/subprojects/gst-examples/webrtc/sendrecv/js/webrtc.js
@@ -8,8 +8,8 @@
  */

 // Set this to override the automatic detection in websocketServerConnect()
-var ws_server;
-var ws_port;
+var ws_server = "webrtc.gstreamer.net";
+var ws_port = 8443;

python code:

#!/usr/bin/env python3

import argparse
import asyncio
import json
import logging
import sys
import threading

import gi
import websockets
from websockets.version import version as wsv

gi.require_version("Gst", "1.0")
from gi.repository import Gst  # NOQA

gi.require_version("GstWebRTC", "1.0")
from gi.repository import GstWebRTC  # NOQA

gi.require_version("GstSdp", "1.0")
from gi.repository import GstSdp  # NOQA

# Ensure that gst-python is installed
try:
    from gi.overrides import Gst as _
except ImportError:
    print("gstreamer-python binding overrides aren't available, please install them")
    raise


GST_PIPELINE_TEMPLATE = """
videotestsrc is-live=true pattern=ball ! vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay pt=97 picture-id-mode=15-bit !
tee name=videotee ! queue ! fakesink sync=true
audiotestsrc is-live=true wave=silence ! audioconvert ! audio/x-raw,channels=1,rate=24000 ! audioresample
  ! opusenc audio-type=voice bitrate=28000 bandwidth=superwideband perfect-timestamp=true ! rtpopuspay pt=96
  ! application/x-rtp,encoding-name=OPUS !
tee name=audiotee ! queue ! fakesink sync=true
audiomixer name=audio-mixer force-live=true ! audioconvert !
audioresample ! audio/x-raw,rate=48000 ! fakesink sync=false
"""


def add_pad_to_element(element: Gst.Element, pad: Gst.Pad) -> None:
    if not element.add_pad(pad):
        logging.error("Unable to add gstreamer pad to element")
        raise RuntimeError("A media error occurred")


def add_to_bin(gst_bin: Gst.Bin, element: Gst.Element) -> None:
    rc = gst_bin.add(element)
    if not rc and rc is not None:
        logging.error(f"Unable to add gstreamer element to bin. rc={rc}")
        raise RuntimeError("A media error occurred")


def sync_state_with_parent(element: Gst.Element) -> None:
    if not element.sync_state_with_parent():
        logging.error("Gstreamer element could not be synced with parent")
        raise RuntimeError("A media error occurred")


def link_pads(srcpad: Gst.Pad, sinkpad: Gst.Pad) -> None:
    if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
        logging.error("Unable to link gstreamer pads")
        raise RuntimeError("A media error occurred")


def print_status(msg):
    print(f"--- {msg}")


def print_error(msg):
    print(f"!!! {msg}", file=sys.stderr)


class WebRTCPeer:
    def __init__(
        self,
        device_id: str,
        peer_id: str,
        client,
        channel_type: str,
        channel_name: str,
        webrtc_key: str,
        pipeline: Gst.Pipeline,
        audio_mixer,
        turn_servers: list[str] | None,
        disconnect_on_timeout: bool = False,
    ):
        self.device_id = device_id
        self._peer_id = peer_id
        self.channel_type = channel_type
        self.channel_name = channel_name
        self.webrtc_key = webrtc_key
        self._pipeline = pipeline
        self._audio_mixer = audio_mixer
        self.inference_enabled = False
        self._client = client
        self._peer_listener = None

        # to be created in start()
        self._webrtc = None  # webrtcbin gst element
        self._bin: Gst.Bin = None
        self._audiotee: Gst.Element = None
        self._videotee: Gst.Element = None
        self._data_channel = None
        self._data_channel_open = False

        self.is_peer_connected = False
        self.sdp_offer_sent = False
        self._turn_servers = turn_servers

        self._sdp_offer: str = None
        self._ice_candidates = []
        self._is_stopping: bool = False
        self._disconnect_on_timeout = disconnect_on_timeout

    @property
    def peer_id(self) -> str:
        return self._peer_id

    @property
    def turn_servers(self) -> list[str] | None:
        return self._turn_servers

    def on_offer_created(self, promise, _, __):
        if promise.wait() != Gst.PromiseResult.REPLIED:
            logging.warning("promise was not replied")
        reply = promise.get_reply()
        offer = reply.get_value("offer")
        promise = Gst.Promise.new()
        self._webrtc.emit("set-local-description", offer, promise)
        promise.interrupt()
        # self.send_sdp_offer(offer)
        self.sdp_offer_sent = True
        self._client.send_sdp(offer)
        logging.info("end offer created")

    def on_negotiation_needed(self, element):
        logging.info("negotiation needed")
        promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
        element.emit("create-offer", None, promise)
        logging.info("create-offer created")

    def on_ice_candidate(self, _, mlineindex, candidate):
        logging.info(f"adding local ice {candidate}")
        self._client.send_ice_candidate_message(_, mlineindex, candidate)

    def on_incoming_stream(self, _, pad: Gst.Pad):
        if pad.direction != Gst.PadDirection.SRC:
            logging.info(f"ignoring pad {pad.get_name()}")
            return

        logging.info(f"incoming stream setup for pad: {pad}")
        caps = pad.get_current_caps()
        s = caps.get_structure(0)
        logging.info(f"caps: {caps.to_string()}. s: {s}")
        media = s.get_string("media")
        enc = s.get_string("encoding-name")
        if media.startswith("audio"):
            logging.info("configuring incoming audio stream...")
            # need an audio converter for incoming audio streams that will decode the binary
            # stream and connect it to the audio mixer for audio output to the device
            if enc == "OPUS":
                logging.info("using opus decoder")
                audio_conv_bin: Gst.Bin = Gst.parse_bin_from_description(
                    "rtpopusdepay name=dbin ! queue ! opusdec ! audioresample name=src",
                    False,
                )
            else:
                audio_conv_bin: Gst.Bin = Gst.parse_bin_from_description(
                    "decodebin name=dbin ! queue ! audioconvert ! audioresample name=src sinc-filter-mode=1",
                    False,
                )
            # Add a ghost pad on our conv bin that proxies the sink pad of the decodebin
            dbin = audio_conv_bin.get_by_name("dbin")
            sinkpad = Gst.GhostPad.new("sink", dbin.get_static_pad("sink"))
            add_pad_to_element(audio_conv_bin, sinkpad)

            # And another one that proxies the source pad of the last element
            src = audio_conv_bin.get_by_name("src")
            srcpad = Gst.GhostPad.new("src", src.get_static_pad("src"))
            add_pad_to_element(audio_conv_bin, srcpad)

            # add to our peer bin
            add_to_bin(self._bin, audio_conv_bin)

            # sync the playing state with the parent peer bin
            sync_state_with_parent(audio_conv_bin)

            # connect incoming audio pad to our decoder bin
            link_pads(pad, sinkpad)

            # add ghost output pad to peer bin
            audio_src_pad = Gst.GhostPad.new("audio_src", srcpad)
            audio_src_pad.set_active(True)
            add_pad_to_element(
                self._bin, audio_src_pad
            )  # should trigger our on_peer_bin_added function
        else:
            logging.error(f"Ignoring stream {s.to_string()} with media type {media}")

    def on_peer_bin_pad_added(self, _, pad):
        logging.info(f"peer bin pad added: {pad.get_name()}")
        # only receive audio streams directly
        if pad.get_name() == "audio_src":
            logging.info("got audio_src")
            audiomixer_sink_pad = self._audio_mixer.get_request_pad("sink_%u")
            link_pads(pad, audiomixer_sink_pad)
            # listen for unlink event in order to cleanup
            audiomixer_sink_pad.connect("unlinked", self.on_audio_pad_unlinked)

            self._pipeline.recalculate_latency()
        else:
            logging.warning(f"ignoring peer bin pad {pad.get_name()}")

    def on_audio_pad_unlinked(self, pad, peer):
        logging.info("audio pad unlinked")
        audiomixer = pad.get_parent()
        audiomixer.release_request_pad(pad)
        # audiomixer.unref()  # decrement ref count from get_parent per docs
        logging.info("audio pad released from mixer")

    def on_pad_probe(self, pad, info):
        logging.info(f"pad probe: {pad.get_name()}; {str(info)}")
        return Gst.PadProbeReturn.OK

    def on_connection_state_change(self, _bin, prop) -> None:
        value = self._webrtc.get_property(prop.name)
        logging.info(f"peer {self._peer_id} connection state changed to {value}")
        if value == GstWebRTC.WebRTCPeerConnectionState.CONNECTED:
            # recalculate latency after peer connection is established. we get warnings from rtpsession if we don't
            webrtc_latency = self._webrtc.get_property("latency")  # in ms
            self._audio_mixer.set_property(
                "min-upstream-latency", webrtc_latency * 1000000
            )  # set value in ns
            rc = self._pipeline.recalculate_latency()
            if not rc:
                logging.warning("failed to recalculate latency")

            self.is_peer_connected = True
            logging.info(f"peer {self._peer_id} connected successfully")
            self._client.switch_conn()
            logging.info("stopped ws conn")
        else:
            self.is_peer_connected = False

    def on_ice_connection_state_change(self, _bin, prop) -> None:
        value = self._webrtc.get_property(prop.name)
        logging.info(f"peer {self._peer_id} ice state changed to {value}")

    def on_signaling_state_change(self, _bin, prop) -> None:
        value = self._webrtc.get_property(prop.name)
        logging.info(f"peer {self._peer_id} signaling state changed to {value}")

    def handle_sdp_answer(self, sdp_dict: dict):
        sdp = sdp_dict["sdp"]

        logging.info("Received answer:\n%s" % sdp)
        res, sdpmsg = GstSdp.SDPMessage.new()
        GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
        answer = GstWebRTC.WebRTCSessionDescription.new(
            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
        )
        logging.info("sending set-remote-descr")
        promise = Gst.Promise.new()
        self._webrtc.emit("set-remote-description", answer, promise)
        promise.interrupt()
        logging.info("set-remote-descr sent")

    def handle_ice(self, ice_dict: dict):
        candidate = ice_dict["candidate"]
        if candidate:
            sdpmlineindex = ice_dict["sdpMLineIndex"]
            self._webrtc.emit("add-ice-candidate", sdpmlineindex, candidate)
            logging.info(f"added remote ice candidate {sdpmlineindex} {str(candidate)}")
        else:
            logging.error(f"unexpected ice payload: {str(ice_dict)}")

    def _handle_disconnection(self, error: str | None) -> None:
        if self._peer_listener:
            # need to start from a new thread b/c callback is async
            t = threading.Thread(
                target=self._peer_listener.on_peer_disconnected,
                args=(self, error),
                name="webrtc-peer-disconnect",
            )
            t.start()

    def _on_data_channel_error(self, channel, error) -> None:
        logging.error(f"peer {self._peer_id} data channel error: {error}")

    def start(self) -> None:
        logging.info(f"starting peer {self._peer_id}")
        # based on https://gitlab.freedesktop.org/gstreamer/gst-examples/-/blob/1.18/webrtc/multiparty-sendrecv/gst-rust/src/main.rs  # noqa: E501
        peer_bin = Gst.parse_bin_from_description(
            "queue name=video-queue ! webrtcbin. queue name=audio-queue ! webrtcbin. webrtcbin name=webrtcbin",
            False,
        )
        self._bin = peer_bin

        webrtc = peer_bin.get_by_name("webrtcbin")
        webrtc.set_property("stun-server", "stun://stun.l.google.com:19302")
        webrtc.set_property("bundle-policy", "max-bundle")
        if self._turn_servers:
            for turn_url in self._turn_servers:
                # The uri of the server of the form turn(s)://username:password@host:port
                webrtc.emit("add-turn-server", turn_url)
        self._webrtc = webrtc

        audio_queue = peer_bin.get_by_name("audio-queue")
        audio_sink_pad = Gst.GhostPad.new(
            "audio_sink", audio_queue.get_static_pad("sink")
        )
        # peer_bin.add_pad(audio_sink_pad)
        add_pad_to_element(peer_bin, audio_sink_pad)

        video_queue = peer_bin.get_by_name("video-queue")
        video_sink_pad = Gst.GhostPad.new(
            "video_sink", video_queue.get_static_pad("sink")
        )
        # peer_bin.add_pad(video_sink_pad)
        add_pad_to_element(peer_bin, video_sink_pad)

        # self._pipeline.add(peer_bin)
        add_to_bin(self._pipeline, peer_bin)

        webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
        webrtc.connect("on-ice-candidate", self.on_ice_candidate)
        webrtc.connect("pad-added", self.on_incoming_stream)
        webrtc.connect(
            "notify::ice-connection-state", self.on_ice_connection_state_change
        )
        webrtc.connect("notify::connection-state", self.on_connection_state_change)
        webrtc.connect("notify::signaling-state", self.on_signaling_state_change)

        # need to create a data channel to coordinate mesh connections for user-to-user connections
        webrtc.set_state(Gst.State.READY)
        data_channel = webrtc.emit("create-data-channel", "signal-data", None)
        if data_channel:
            self._data_channel = data_channel
            logging.info("data channel created")
            data_channel.connect("on-error", self._on_data_channel_error)
        else:
            logging.error("data channel not created")

        peer_bin.connect("pad-added", self.on_peer_bin_pad_added)

        audiotee = self._pipeline.get_by_name("audiotee")
        self._audiotee = audiotee
        audio_src_pad = audiotee.get_request_pad("src_%u")
        audio_block = audio_src_pad.add_probe(
            Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
        )
        # ret = audio_src_pad.link(audio_sink_pad)
        link_pads(audio_src_pad, audio_sink_pad)

        videotee = self._pipeline.get_by_name("videotee")
        self._videotee = videotee
        video_src_pad = videotee.get_request_pad("src_%u")
        video_block = video_src_pad.add_probe(
            Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
        )
        # ret = video_src_pad.link(video_sink_pad)
        link_pads(video_src_pad, video_sink_pad)

        # declare that we will only send (not receive) video
        trans = webrtc.emit("get-transceiver", 0)
        trans.set_property(
            "direction", GstWebRTC.WebRTCRTPTransceiverDirection.SENDONLY
        )
        logging.info("set sendonly")

        # closure to unblock pads
        def set_bin_playing(b):
            # rc = b.sync_state_with_parent()
            sync_state_with_parent(b)

            audio_src_pad.remove_probe(audio_block)
            video_src_pad.remove_probe(video_block)
            logging.info("peer block removed")

        peer_bin.call_async(set_bin_playing)

        logging.info("add peer complete")

    def stop(self) -> None:
        self._is_stopping = True
        logging.info(f"stopping peer {self._peer_id} on channel {self.channel_name}")

        if self._data_channel:
            try:
                self._data_channel.emit("close")
                logging.info("data channel closed")
            except Exception:
                logging.exception("error closing data channel")

        try:
            # Block the tees shortly for removal
            audiotee_sinkpad = self._audiotee.get_static_pad("sink")
            if audiotee_sinkpad:
                audio_block = audiotee_sinkpad.add_probe(
                    Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
                )
            else:
                logging.warning("unable to get audiotee sink pad")
                audio_block = None

            videotee_sinkpad = self._videotee.get_static_pad("sink")
            if videotee_sinkpad:
                video_block = videotee_sinkpad.add_probe(
                    Gst.PadProbeType.BLOCK_DOWNSTREAM, self.on_pad_probe
                )
            else:
                logging.warning("unable to get videotee sink pad")
                video_block = None

            # Release the tee pads and unblock
            audio_sinkpad = self._bin.get_static_pad("audio_sink")
            video_sinkpad = self._bin.get_static_pad("video_sink")

            if audio_sinkpad:
                audiotee_srcpad = audio_sinkpad.get_peer()
                if audiotee_srcpad:
                    audiotee_srcpad.unlink(audio_sinkpad)
                    self._audiotee.release_request_pad(audiotee_srcpad)
                else:
                    logging.warning("unable to get audiotee srcpad")
            else:
                logging.warning("unable to get audio sinkpad")

            if audiotee_sinkpad and audio_block:
                audiotee_sinkpad.remove_probe(audio_block)
                logging.info("removed audio block probe")

            if video_sinkpad:
                videotee_srcpad = video_sinkpad.get_peer()
                if videotee_srcpad:
                    videotee_srcpad.unlink(video_sinkpad)
                    self._videotee.release_request_pad(videotee_srcpad)
                else:
                    logging.error("unable to get videotee srcpad")

            if videotee_sinkpad and video_block:
                videotee_sinkpad.remove_probe(video_block)
                logging.info("removed video block probe")
        except Exception:
            logging.exception("Exeption while stopping peer")

        # Then remove the peer bin gracefully from the pipeline
        self._pipeline.remove(self._bin)
        self._bin.set_state(Gst.State.NULL)
        # release references
        self._pipeline = None
        self._audio_mixer = None
        self._webrtc = None
        self._bin = None
        self._audiotee = None
        self._videotee = None
        self._pubsub_client = None
        self._sdp_offer = None
        self._ice_candidates = None
        self._data_channel = None
        self._peer_listener = None
        self._turn_servers = None
        logging.info("peer removed")


class WebRTCClient:
    def __init__(
        self,
        loop,
        our_id,
        peer_id,
        server,
        remote_is_offerer,
        video_encoding,
        source_type,
    ):
        self.current_conn = None
        self.pipe = None
        self.webrtc_peers = []
        self.event_loop = loop
        self.server = server
        # An optional user-specified ID we can use to register
        self.our_id = our_id
        # The actual ID we used to register
        self.id_ = None
        # An optional peer ID we should connect to
        self.peer_id = peer_id
        # Whether we will send the offer or the remote peer will
        self.remote_is_offerer = remote_is_offerer

    async def send(self, conn, msg):
        assert conn
        print(f">>> {msg}")
        await conn.send(msg)

    def switch_conn(self):
        self.current_conn = self.conn2
        print("switched to conn2")

    def start_conn_thread(self, conn_name):
        t = threading.Thread(
            target=self.start_loop, args=(conn_name,), name=f"conn-thread-{conn_name}"
        )
        t.start()

    def start_loop(self, conn_name):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        if conn_name == "5001":
            self.conn1 = loop.run_until_complete(websockets.connect(self.server))
            conn = self.conn1
            self.current_conn = conn
        else:
            self.conn2 = loop.run_until_complete(websockets.connect(self.server))
            conn = self.conn2
        loop.run_until_complete(self.send(conn, f"HELLO {conn_name}"))
        loop.run_until_complete(self.loop(conn, conn_name))

    async def setup_call(self, conn, peer_id):
        assert peer_id
        await self.send(conn, f"SESSION {peer_id}")

    def send_soon(self, msg):
        asyncio.run_coroutine_threadsafe(
            self.send(self.current_conn, msg), self.event_loop
        )

    def on_bus_poll_cb(self, bus):
        def remove_bus_poll():
            self.event_loop.remove_reader(bus.get_pollfd().fd)
            self.event_loop.stop()

        while bus.peek():
            msg = bus.pop()
            if msg.type == Gst.MessageType.ERROR:
                err = msg.parse_error()
                print("ERROR:", err.gerror, err.debug)
                remove_bus_poll()
                break
            elif msg.type == Gst.MessageType.EOS:
                remove_bus_poll()
                break
            elif msg.type == Gst.MessageType.LATENCY:
                self.pipe.recalculate_latency()

    def send_sdp(self, offer):
        text = offer.sdp.as_text()
        if offer.type == GstWebRTC.WebRTCSDPType.OFFER:
            print_status("Sending offer:\n%s" % text)
            msg = json.dumps({"sdp": {"type": "offer", "sdp": text}})
        elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
            print_status("Sending answer:\n%s" % text)
            msg = json.dumps({"sdp": {"type": "answer", "sdp": text}})
        else:
            raise AssertionError(offer.type)
        self.send_soon(msg)

    def send_ice_candidate_message(self, _, mlineindex, candidate):
        icemsg = json.dumps(
            {"ice": {"candidate": candidate, "sdpMLineIndex": mlineindex}}
        )
        self.send_soon(icemsg)

    def start_pipeline(self, create_offer=True, audio_pt=96, video_pt=97):
        if self.pipe:
            return
        print_status(f"Creating pipeline, create_offer: {create_offer}")
        desc = GST_PIPELINE_TEMPLATE
        self.pipe = Gst.parse_launch(desc)
        bus = self.pipe.get_bus()
        self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
        self.pipe.set_state(Gst.State.PLAYING)

    def handle_json(self, message):
        logging.info(f"incoming json message: {message}")
        try:
            msg = json.loads(message)
        except json.decoder.JSONDecodeError:
            print_error("Failed to parse JSON message, this might be a bug")
            raise
        if "sdp" in msg:
            sdp = msg["sdp"]["sdp"]
            if msg["sdp"]["type"] == "answer":
                print_status("Received answer:\n%s" % sdp)
                res, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
                answer = GstWebRTC.WebRTCSessionDescription.new(
                    GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
                )
                promise = Gst.Promise.new()
                # just use the last peer
                webrtc = self.webrtc_peers[-1]._webrtc
                webrtc.emit("set-remote-description", answer, promise)
                promise.interrupt()  # we don't care about the result, discard it
            else:
                print_status("Received offer:\n%s" % sdp)
        elif "ice" in msg:
            assert self.webrtc_peers[-1]._webrtc
            ice = msg["ice"]
            candidate = ice["candidate"]
            sdpmlineindex = ice["sdpMLineIndex"]
            webrtc = self.webrtc_peers[-1]._webrtc
            webrtc.emit("add-ice-candidate", sdpmlineindex, candidate)
            print("added remote ice candidate")
        else:
            print_error("Unknown JSON message")

    def close_pipeline(self):
        if self.pipe:
            self.pipe.set_state(Gst.State.NULL)
            self.pipe = None
        self.webrtc = None

    def add_peer(self):
        peer_id = len(self.webrtc_peers) + 1000 + 1
        audio_mixer = self.pipe.get_by_name("audio-mixer")
        peer = WebRTCPeer(
            "foo",
            peer_id,
            self,
            "test",
            "test",
            "test-key",
            self.pipe,
            audio_mixer,
            None,
        )
        peer.start()
        self.webrtc_peers.append(peer)

    async def loop(self, conn, conn_name):
        print("starting loop")
        assert conn
        async for message in conn:
            print(f"<<< {message}")
            if message == "HELLO":
                print_status("waiting for for offer request")
            elif message == "SESSION_OK":
                self.start_pipeline()
            elif message == "OFFER_REQUEST":
                print_status("Incoming call: we have been asked to create the offer")
                self.start_pipeline()
                self.add_peer()
            elif message.startswith("ERROR"):
                print_error(message)
                self.close_pipeline()
                return 1
            else:
                self.handle_json(message)
        # self.close_pipeline()
        print("loop finished")
        return 0

    async def stop(self):
        if self.conn:
            await self.conn.close()
        self.conn = None


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO, format="%(levelname)s [%(threadName)s] %(message)s"
    )
    Gst.init(None)
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--our-id",
        default="5001",
        help="String ID that the peer can use to connect to us",
    )
    parser.add_argument(
        "--server",
        default="wss://webrtc.gstreamer.net:8443",
        help='Signalling server to connect to, eg "wss://127.0.0.1:8443"',
    )
    args = parser.parse_args()
    loop = asyncio.new_event_loop()
    c = WebRTCClient(
        loop,
        args.our_id,
        None,
        args.server,
        True,
        "vp8",
        "test"
    )
    c.start_conn_thread("5001")
    c.start_conn_thread("5002")
    loop.run_forever()
    sys.exit(0)

Here are my repro steps:

  1. Start python program
  2. Open the index.html from the gstreamer repo locally using a file url (e.g. file:///Users/james/dev/gstreamer/subprojects/gst-examples/webrtc/sendrecv/js/index.html). This was tested in Chrome incognito mode
  3. Enter 5001 into the peer id box. Click the Remote offerer checkbox then click Connect
  4. You should be present with a dialog from the browser to allow access to the microphone/camera, click allow. At this point you should see the test video of a bouncing ball.
  5. Open a new browser window and go to the same file url as described above
  6. Enter 5002 into the peer id box. Click the Remote offerer checkbox then click Connect
  7. Before clicking allow to enable access to your mic/camera, notice the video in the first window is now frozen and will not resume until you click allow in the second browser window.

If you have any problems let me know. Thanks so much for your help.

I think I may have made a breakthrough. When I dynamically add a new peer to the pipeline I add queues before the webrtcbin element like in the gstreamer multiparty example. In the gstreamer example, the queues use all default properties. In another implementation of multiparty that I found elsewhere I noticed the queues are created with properties silent=true and leaky=downstream. When I add those properties to my queues, I no longer have the freezing problem.

Are there any problems that may occur with using those flags here? Is there a reason they aren’t present in the gstreamer example?

Thanks

2 Likes