Livekitwebrtcsrc with dynamic user add and reconnects

I’m writing python application with should connect to LiveKit room, receive all members audio put in one audio mix and record to file. I’m using Element signal: pad-added, code below. When user reconnects I’m getting this error:

basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: Internal data stream error.
basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: streaming stopped, reason error (-5)
queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: Internal data stream error.
queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: streaming stopped, reason error (-5)

Can someone help with that?

DEFAULT_PIPELINE = "\
    audiomixer name=amix \
    mpegtsmux name=mux \
    livekitwebrtcsrc async-handling=false name=srclive video-codecs=<VP8> signaller::ws-url=ws://127.0.0.1:7880   signaller::api-key=APIMBu7ZiHwC5t9   signaller::secret-key=KIRGA9hjzgfgaTInW0upUiTVRkJCJKhfhwL3g0cAGo4   \
        signaller::room-name=my-first-room   \
        signaller::identity=gst-consumer signaller::participant-name=gst-consumer \
    audiotestsrc is-live=true wave=4 freq=200 ! audioconvert ! audioresample  ! amix.  \
    amix. ! queue ! avenc_aac ! identity dump=false ! fakesink dump=false" 

...

def pad_added(element, pad, **data):
    global pipeline
    global element_dict

    if Gst.Pad.is_linked(pad):
        return Gst.PadLinkReturn.OK 

    logger.info("#############################################"+ pad.name)
    
    dump = False
    
    if "video" in pad.name:
        logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))        
        
        element_dict[pad.name] = {}

        element_dict[pad.name]["queue"]  = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
        Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 2)
        Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)

        element_dict[pad.name]["identity"]  = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
        Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)        

        element_dict[pad.name]["fakesink"]  = Gst.ElementFactory.make("fakesink", "fakesink_" + str(pad.name))
        Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)     

        pipeline.add(element_dict[pad.name]["queue"])
        pipeline.add(element_dict[pad.name]["identity"])
        pipeline.add(element_dict[pad.name]["fakesink"])
        Gst.Object.set_property(element_dict[pad.name]["fakesink"], "sync", False)
        Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)
        sink_pad = element_dict[pad.name]["fakesink"].get_static_pad("sink")

        element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
        element_dict[pad.name]["identity"].link(element_dict[pad.name]["fakesink"])
        sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
        pad.link(sink_pad)        

        element_dict[pad.name]["queue"].sync_state_with_parent()
        element_dict[pad.name]["identity"].sync_state_with_parent()
        element_dict[pad.name]["fakesink"].sync_state_with_parent()        

        return Gst.PadLinkReturn.OK   

    logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))      
    element_dict[pad.name] = {}

    element_dict[pad.name]["queue"]  = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
    Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 2)
    Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)    
    element_dict[pad.name]["identity"]  = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
    Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)
    element_dict[pad.name]["rtpopusdepay"]  = Gst.ElementFactory.make("rtpopusdepay", "rtpopusdepay_" + str(pad.name))
    element_dict[pad.name]["opusdec"]  = Gst.ElementFactory.make("opusdec", "opusdec_" + str(pad.name))
    element_dict[pad.name]["audioconvert"]  = Gst.ElementFactory.make("audioconvert", "audioconvert_" + str(pad.name))
    element_dict[pad.name]["audioresample"]  = Gst.ElementFactory.make("audioresample", "audioresample_" + str(pad.name))
    amix = pipeline.get_by_name("amix")

    pipeline.add(element_dict[pad.name]["queue"])
    pipeline.add(element_dict[pad.name]["identity"])
    pipeline.add(element_dict[pad.name]["rtpopusdepay"])
    pipeline.add(element_dict[pad.name]["opusdec"])
    pipeline.add(element_dict[pad.name]["audioconvert"])
    pipeline.add(element_dict[pad.name]["audioresample"])

    sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
    if not Gst.Pad.is_linked(sink_pad):
        pad.link(sink_pad)

    if Gst.Pad.is_linked(sink_pad):
        logger.info("Pads linked successfully")        

    element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
    element_dict[pad.name]["identity"].link(element_dict[pad.name]["rtpopusdepay"])
    element_dict[pad.name]["rtpopusdepay"].link(element_dict[pad.name]["opusdec"])
    element_dict[pad.name]["opusdec"].link(element_dict[pad.name]["audioconvert"])
    element_dict[pad.name]["audioconvert"].link(element_dict[pad.name]["audioresample"])
    element_dict[pad.name]["audioresample"].link(amix)

    element_dict[pad.name]["queue"].sync_state_with_parent()
    element_dict[pad.name]["identity"].sync_state_with_parent()
    element_dict[pad.name]["rtpopusdepay"].sync_state_with_parent()
    element_dict[pad.name]["opusdec"].sync_state_with_parent()
    element_dict[pad.name]["audioconvert"].sync_state_with_parent()
    element_dict[pad.name]["audioresample"].sync_state_with_parent()    

    logger.info("PAD_ADDED " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))      
    return Gst.PadLinkReturn.OK
...
    kivekitbin = pipeline.get_by_name("srclive") 
    if kivekitbin:
        kivekitbin.connect('pad-added', pad_added)

Is this still a problem for you?

The code you have looks like it’s more complicated than it needs to be. Can you try this C code that should do roughly what you want, except it displays a visualization of the mixer output?

#include <gst/gst.h>

static void
on_srcpad_added (G_GNUC_UNUSED GstElement * src, GstPad * srcpad, GstElement * mixer)
{
  gst_printerrln ("pad added %" GST_PTR_FORMAT, srcpad);
  g_autoptr(GstPad) sinkpad = gst_element_request_pad_simple (mixer, "sink_%u");
  if (!GST_PAD_LINK_SUCCESSFUL (gst_pad_link (srcpad, sinkpad))) {
    gst_printerrln ("failed to link %" GST_PTR_FORMAT "=>%" GST_PTR_FORMAT,
        srcpad, sinkpad);
  }
}

static gboolean
on_bus_message (G_GNUC_UNUSED GstBus * bus, GstMessage * msg, GMainLoop * loop)
{
  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_EOS:
    case GST_MESSAGE_ERROR:
      gst_printerrln ("error %" GST_PTR_FORMAT, msg);
      g_main_loop_quit (loop);
      break;
    default:
      break;
  }
  return TRUE;
}

gint
main (gint argc, gchar ** argv)
{
  gst_init (&argc, &argv);

  g_autoptr(GMainLoop) loop = g_main_loop_new (NULL, FALSE);

  g_autoptr(GError) error = NULL;
  g_autoptr(GstElement) pipe = gst_parse_launch (
      "livekitwebrtcsrc name=src "
      " signaller::ws-url=ws://localhost:7880 "
      " signaller::api-key=devkey signaller::secret-key=secret "
      " signaller::room-name=test "
      " video-codecs=<> "
      "audiomixer name=mix "
      "! queue "
      "! audioconvert "
      "! audioresample "
      "! spectrascope ! timeoverlay ! videoconvert ! autovideosink "
      "",
      &error
  );

  if (error) {
    gst_printerrln ("failed to set up pipeline: %s", error->message);
    return -1;
  }

  g_autoptr(GstBus) bus = gst_pipeline_get_bus (GST_PIPELINE (pipe));
  gst_bus_add_watch (bus, (GstBusFunc) on_bus_message, loop);

  g_autoptr(GstElement) src = gst_bin_get_by_name (GST_BIN_CAST (pipe), "src");
  g_autoptr(GstElement) mixer = gst_bin_get_by_name (GST_BIN_CAST (pipe), "mix");

  g_signal_connect (src, "pad-added", G_CALLBACK (on_srcpad_added), mixer);

  gst_element_set_state (pipe, GST_STATE_PLAYING);
  g_main_loop_run (loop);
  gst_element_set_state (pipe, GST_STATE_NULL);

  return 0;
}
1 Like

Thank for response, and many thank for provided example. Main idea of my code to write audio all time even if all room members are silent ( mic off ). In this i need write silence (sin wave, any). That’s why i’m using audiomixer and audiotestsrc.
But the main problem is if room members reconnects ( bad internet connection ) i got “Internal data stream error”

I have tried to minimize your example to get a better understanding of what it’s doing.
This is the result in python:

#!/usr/bin/env python3

import gi

gi.require_version('GLib', '2.0')
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst

DEFAULT_PIPELINE = '''
    audiomixer name=amix force-live=1 ignore-inactive-pads=1
    mpegtsmux name=mux
    livekitwebrtcsrc name=srclive
        signaller::ws-url=ws://127.0.0.1:7880
        signaller::api-key=devkey
        signaller::secret-key=secret
        signaller::room-name=test
        signaller::identity=gst-consumer
        signaller::participant-name=gst-consumer
    audiotestsrc is-live=true wave=ticks ! audioconvert ! audioresample ! queue ! amix.
    amix. ! queue ! audioconvert ! wavescope ! timeoverlay ! videoconvert ! queue ! autovideosink
'''
VIDEO_BRANCH = '''
    queue leaky=downstream flush-on-eos=true
    ! fakesink can-activate-pull=true
'''
AUDIO_BRANCH = '''
    queue leaky=downstream flush-on-eos=true
    ! audioconvert
    ! audioresample
'''

element_dict = dict()

global pipeline, mixer

def main():
    global pipeline, mixer

    Gst.init()

    mainloop = GLib.MainLoop()
    pipeline = Gst.parse_launch(DEFAULT_PIPELINE)

    src = pipeline.get_by_name('srclive')
    mixer = pipeline.get_by_name('amix')

    src.connect('pad-added', pad_added)

    pipeline.set_state(Gst.State.PLAYING)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        raise
    finally:
        pipeline.set_state(Gst.State.NULL)

def pad_added(element, pad, **data):

    print(f'PAD ADDED {pad.name}')

    if 'video' in pad.name:
        bin = Gst.parse_bin_from_description(VIDEO_BRANCH, True)
        pipeline.add(bin)
        bin.sync_state_with_parent()
        sink_pad = bin.get_static_pad('sink')
        pad.link(sink_pad)
        element_dict[pad.name] = bin
    elif 'audio' in pad.name:
        bin = Gst.parse_bin_from_description(AUDIO_BRANCH, True)
        pipeline.add(bin)
        bin.sync_state_with_parent()
        sink_pad = bin.get_static_pad('sink')
        pad.link(sink_pad)
        bin.link(mixer)
        element_dict[pad.name] = bin

if __name__ == '__main__':
    main()

Do you have any more information about how the users are disconnecting and reconnecting (are they cleanly exiting and re-entering the room or is their network connection dropped or something else)? I have tried disconnecting and reconnecting with this code and using multiple computers, also disabling and reconnecting the wireless network connection for one client and I don’t see any errors with this code. I’m using GStreamer 1.22.11 along with the latest version of gst-plugins-rs.

I’m testing just pressing Refresh(F5).
livekitwebrtcsrc : Version 0.12.3-RELEASE
GStreamer 1.24.1