I’m writing python application with should connect to LiveKit room, receive all members audio put in one audio mix and record to file. I’m using Element signal: pad-added, code below. When user reconnects I’m getting this error:
basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: Internal data stream error.
basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: streaming stopped, reason error (-5)
queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: Internal data stream error.
queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: streaming stopped, reason error (-5)
Can someone help with that?
DEFAULT_PIPELINE = "\
audiomixer name=amix \
mpegtsmux name=mux \
livekitwebrtcsrc async-handling=false name=srclive video-codecs=<VP8> signaller::ws-url=ws://127.0.0.1:7880 signaller::api-key=APIMBu7ZiHwC5t9 signaller::secret-key=KIRGA9hjzgfgaTInW0upUiTVRkJCJKhfhwL3g0cAGo4 \
signaller::room-name=my-first-room \
signaller::identity=gst-consumer signaller::participant-name=gst-consumer \
audiotestsrc is-live=true wave=4 freq=200 ! audioconvert ! audioresample ! amix. \
amix. ! queue ! avenc_aac ! identity dump=false ! fakesink dump=false"
...
def pad_added(element, pad, **data):
global pipeline
global element_dict
if Gst.Pad.is_linked(pad):
return Gst.PadLinkReturn.OK
logger.info("#############################################"+ pad.name)
dump = False
if "video" in pad.name:
logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
element_dict[pad.name] = {}
element_dict[pad.name]["queue"] = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 2)
Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)
element_dict[pad.name]["identity"] = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)
element_dict[pad.name]["fakesink"] = Gst.ElementFactory.make("fakesink", "fakesink_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)
pipeline.add(element_dict[pad.name]["queue"])
pipeline.add(element_dict[pad.name]["identity"])
pipeline.add(element_dict[pad.name]["fakesink"])
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "sync", False)
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)
sink_pad = element_dict[pad.name]["fakesink"].get_static_pad("sink")
element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
element_dict[pad.name]["identity"].link(element_dict[pad.name]["fakesink"])
sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
pad.link(sink_pad)
element_dict[pad.name]["queue"].sync_state_with_parent()
element_dict[pad.name]["identity"].sync_state_with_parent()
element_dict[pad.name]["fakesink"].sync_state_with_parent()
return Gst.PadLinkReturn.OK
logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
element_dict[pad.name] = {}
element_dict[pad.name]["queue"] = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 2)
Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)
element_dict[pad.name]["identity"] = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)
element_dict[pad.name]["rtpopusdepay"] = Gst.ElementFactory.make("rtpopusdepay", "rtpopusdepay_" + str(pad.name))
element_dict[pad.name]["opusdec"] = Gst.ElementFactory.make("opusdec", "opusdec_" + str(pad.name))
element_dict[pad.name]["audioconvert"] = Gst.ElementFactory.make("audioconvert", "audioconvert_" + str(pad.name))
element_dict[pad.name]["audioresample"] = Gst.ElementFactory.make("audioresample", "audioresample_" + str(pad.name))
amix = pipeline.get_by_name("amix")
pipeline.add(element_dict[pad.name]["queue"])
pipeline.add(element_dict[pad.name]["identity"])
pipeline.add(element_dict[pad.name]["rtpopusdepay"])
pipeline.add(element_dict[pad.name]["opusdec"])
pipeline.add(element_dict[pad.name]["audioconvert"])
pipeline.add(element_dict[pad.name]["audioresample"])
sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
if not Gst.Pad.is_linked(sink_pad):
pad.link(sink_pad)
if Gst.Pad.is_linked(sink_pad):
logger.info("Pads linked successfully")
element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
element_dict[pad.name]["identity"].link(element_dict[pad.name]["rtpopusdepay"])
element_dict[pad.name]["rtpopusdepay"].link(element_dict[pad.name]["opusdec"])
element_dict[pad.name]["opusdec"].link(element_dict[pad.name]["audioconvert"])
element_dict[pad.name]["audioconvert"].link(element_dict[pad.name]["audioresample"])
element_dict[pad.name]["audioresample"].link(amix)
element_dict[pad.name]["queue"].sync_state_with_parent()
element_dict[pad.name]["identity"].sync_state_with_parent()
element_dict[pad.name]["rtpopusdepay"].sync_state_with_parent()
element_dict[pad.name]["opusdec"].sync_state_with_parent()
element_dict[pad.name]["audioconvert"].sync_state_with_parent()
element_dict[pad.name]["audioresample"].sync_state_with_parent()
logger.info("PAD_ADDED " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
return Gst.PadLinkReturn.OK
...
kivekitbin = pipeline.get_by_name("srclive")
if kivekitbin:
kivekitbin.connect('pad-added', pad_added)