RTSP Disconnect and Reconnect on Error during PLAY

I have a Python gstreamer pipeline where I am streaming from several RTSP sources and processing frames that arrive at an appsink element. I’ve attached a simple 2 camera example of the setup/pipeline graph. The plan is to run this pipeline for several hours, and there is a chance that during this time one of the cameras may disconnect/have connection issues that leads to rtspsrc not recieving any data. I am wondering what the best way is to achieve this?

I was wondering if there is a way I can add some sort of callback to the src pad of the RtspSourceBin()–>Gst.Bin output, that listens for a certain signal (or maybe I would need to listen on the bus with a message handler that looks for messages from the RtspSourceBin?). I see that a time out message is generated called “GstRTSPSrcTimeout”, perhaps I should listen for this?

Ideally, when an error occurs where the stream connection is lost I’d like to disconnect the offending bin and attempt to re-establish a new connection while the pipeline is still playing.

In the below code I have a method that creates a Gst.Bin object containing some elements for RTSP streaming- this isn’t set in stone, I could make changes to this if necessary.

Pipeline creation method

def create_pipeline(self):
        """
        Creates the multi-rtsp stream deepstream pipeline
        @return:
        """

        if self.num_cameras == 0:
            print('No cameras initialized in DSSystem!')
            exit()

        # Handle Some initial settings
        self.decode_mode = DecoderEnum(self.ds_config['decode_mode'])
        if not self.ds_config['new_nvstreammux']:
            exit('NEW_NVSTREAMMUX must be set to True [ 1 ]')
        else:
            print('Setting env. var. USE_NEW_NVSTREAMMUX: yes')
            os.environ["USE_NEW_NVSTREAMMUX"] = "yes"

        if self.MyEnvironmentVar.PERF_DATA_BOOL:
            global perf_data
            perf_data = PERF_DATA(self.num_cameras)

        # PREPARE GSTREAMER AND CREATE NEW PIPELINE
        Gst.init(None)
        self.gst_pipeline = Gst.Pipeline.new('multi-stream-pipeline')
        # PREPARE GSTREAMER AND CREATE NEW PIPELINE

        # LINK RTSPinputBIN AND NVSTREAMMUX PADS
        # For precise control use property ("config-file-path", /my/path/conf.cfg)
        # path = 'configs/nvstreammux_conf.txt'
        # nvstrmux.set_property('config-file-path', path)
        # print(nvstrmux.get_property('config-file-path'))
        # Create nvstreammux instance to form batches from one or more sources.

        nvstrmux = Gst.ElementFactory.make("nvstreammux", "nvstrmux")
        if not nvstrmux:
            sys.stderr.write(" Unable to create NvStreamMux \n")

        self.gst_pipeline.add(nvstrmux)
        for ix, cam in enumerate(self.cameras):
            cam: CameraSource
            rtspbin = RtspSourceBin().create_rtsp_source_bin(index=ix,
                                                             camera_src=cam,
                                                             dec_mode=self.decode_mode.name)
            cam.gst_bin = rtspbin
            self.gst_pipeline.add(cam.gst_bin)

            sinkpad = nvstrmux.get_request_pad(f'sink_{ix}')
            if not sinkpad:
                sys.stderr.write('Unable to create sink pad bin for nvstreammux \n')
            sourcepad = cam.gst_bin.get_static_pad('src')
            if not sourcepad:
                sys.stderr.write('Unable to create src pad bin \n')

            sourcepad.link(sinkpad)

        # TODO: If required add output queue mux --> queue --> appsink
        # PREPARE QUEUE FOR MUX SRC PAD
        # out_queue = Gst.ElementFactory.make('queue', f'infer_queue')
        # if not infer_queue:
        #     sys.stderr.write(" Unable to create infer_queue \n")
        # infer_queue.set_property("leaky", 2)
        # infer_queue.set_property("max-size-buffers", 1)
        # # Prepare Queue/Buffer between PGIE and Tracker
        # pipeline.add(infer_queue)

        # PIPELINE TERMINATION
        appsink = Gst.ElementFactory.make('appsink', 'batch_sink')
        appsink.set_property("emit-signals", True)
        sink_handler_id = appsink.connect("new-sample", on_new_sample, self.buffer_queue)  # TODO: was self.buffer_queue

        # PIPELINE ADDING AND LINKING
        print("Adding elements to Pipeline \n")
        self.gst_pipeline.add(appsink)
        nvstrmux.link(appsink)

        # PREPARE EVENT/BUS LOOP AND MESSAGING
        self.gst_loop = GLib.MainLoop()
        self.gst_bus = self.gst_pipeline.get_bus()

        # Bus will be polled to watch for GObject.connect('message type', ..., ...)
        self.gst_bus.add_signal_watch()

        # TODO: Multiple connects could be used here and message::eos, or message::error
        #       can be used to only call certain callback functions
        #       (ie. specific callbacks to handle certain message types)
        handler_id = self.gst_bus.connect("message", cb_gstloop_bus.bus_msg_handler, self.gst_loop)
        # connect returns a handler ID for the callback
        # could connect several different callbacks to the message signal
        # PREPARE EVENT/BUS LOOP AND MESSAGING

        if self.MyEnvironmentVar.SAVE_DS_GRAPH:
            # SAVE THE PIPELINE GRAPH
            self.save_pipeline_as_pdf(f'multi_stream_pipeline_graph_NULL')

        # PIPELINE PLAYING AND NULL CLEANUP
        print("Starting pipeline \n")
        self.gst_pipeline.set_state(Gst.State.PLAYING)

        if self.MyEnvironmentVar.SAVE_DS_GRAPH:
            # SAVE THE PIPELINE GRAPH
            self.save_pipeline_as_pdf(f'multi_stream_pipeline_graph_PLAY')

        t_setup = (time.time_ns() - DSSystem.t0) * 1e-9
        print(f'Pipeline setup took {t_setup}\nStarting DSSystem Run Time Clock')

RTSP source bin creation method

class RtspSourceBin:

    @classmethod
    def create_rtsp_source_bin(cls, index: int, camera_src: CameraSource,
                               dec_mode="HARDWARE", format="RGBA"):
        # TODO: For Python RGBA is only supported in this case, ideally we could use BGRx,
        #       however, as of this DeepStream version 6.2 it does not seem to be supported for our use case

        # SOURCE BIN
        # Create a source GstBin to abstract this bin's content from the rest of the pipeline
        bin_name = "source-bin-%02d" % index
        print(bin_name)
        rtsp_source_bin = Gst.Bin.new(bin_name)
        if not rtsp_source_bin:
            logger.error(" Unable to create source bin \n")

        # INTERNAL BIN ELEMENTS
        # TODO: Add GsTProp class to CameraSource class
        rtspsrc = Gst.ElementFactory.make("rtspsrc", f"rtspsrc_{index}")
        rtspsrc.set_property("location", camera_src.source_uri)
        # rtspsrc.set_property("protocols", "tcp")  # def: tcp+udp-mcast+udp
        rtspsrc.set_property("latency", 2000)  # Amount of ms to buffer
        rtspsrc.set_property("udp-buffer-size", 2000000)  # Size of the kernel UDP receive buffer in bytes.
        rtspsrc.set_property("drop-on-latency", 1)
        rtspsrc.set_property("timeout", 5000000)  # Retry TCP transport after UDP timeout microseconds
        # rtspsrc.set_property("debug", 1)

        # # For NTP RTCP Sender reports see:
        # # https://docs.nvidia.com/metropolis/deepstream/6.2/dev-guide/text/DS_NTP_Timestamp.html?highlight=ntp
        GST_HELPER_LIB = ctypes.CDLL("/opt/nvidia/deepstream/deepstream/lib/libnvdsgst_helper.so")
        GST_HELPER_LIB.configure_source_for_ntp_sync(hash(rtspsrc))

        # Extracts H*** video from RTP packets (RFC 3984)
        rtpdepay = None
        if camera_src.parameters.compression == "H264":
            rtpdepay = Gst.ElementFactory.make("rtph264depay", f"rtpdepay_{index}")
            debug("Creating H264 rtpdepay")
        elif camera_src.parameters.compression == "H265":
            rtpdepay = Gst.ElementFactory.make("rtph265depay", f"rtpdepay_{index}")
            debug("Creating H265 rtpdepay")
        if not rtpdepay:
            sys.stderr.write(f"ERROR: Unable to create rtpdepay_{index}")
            sys.exit(1)

        # Parses H*** streams
        h26parse = None
        if camera_src.parameters.compression == "H264":
            h26parse = Gst.ElementFactory.make("h264parse", f"h26parse_{index}")
            debug("Creating H264 parser")
        elif camera_src.parameters.compression == "H265":
            h26parse = Gst.ElementFactory.make("h265parse", f"h26parse_{index}")
            debug("Creating H265 parser")
        if not rtpdepay:
            sys.stderr.write(f"ERROR: Unable to create h26parse_{index}")
            sys.exit(1)

        if dec_mode == "SOFTWARE":
            print('Using software decoding')
            if camera_src.parameters.compression == "H264":
                decoder = Gst.ElementFactory.make("avdec_h264")
                debug("Creating H264 dec")
            elif camera_src.parameters.compression == "H265":
                decoder = Gst.ElementFactory.make("avdec_h265")
                debug("Creating H265 dec")
        elif dec_mode == "HARDWARE":
            print('Using hardware decoding')
            # https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvvideo4linux2.html
            decoder = Gst.ElementFactory.make("nvv4l2decoder", f"nvv4l2decoder_{index}")
            decoder.set_property("cudadec-memtype", int(pyds.NVBUF_MEM_CUDA_DEVICE))
            # decoder.set_property("num-extra-surfaces", settings.DEEPSTREAM_NVV4L2DECODER_NUM_EXTRA_SURFACES)

        # https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvvideoconvert.html
        nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv_rtspsrc{index}")
        nvvideoconvert.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))
        # nvvideoconvert.set_property("output-buffers", settings.DEEPSTREAM_NVVIDEOCONVERT_OUTPUT_BUFFERS)

        # Queue to avoid memory leak
        # This queue is necessary to avoid a memory leak
        # rtpjitterbuffer can still cause buffer overflows
        # queue = Gst.ElementFactory.make("queue", f"queue_drop_frames{index}")
        # queue.set_property("leaky", 2)
        # # queue.set_property("max-size-buffers", 1)
        # queue.set_property("max-size-time", 5000000000)  # ns ... testing buffering

        # By converting images to RGBA we can download them from GPU whenever we want
        caps_filter = Gst.ElementFactory.make("capsfilter", f"capsfilt_rtspsrc{index}")
        caps_filter.set_property("caps", Gst.Caps.from_string((f"video/x-raw(memory:NVMM), format={format}")))

        # RTSPSRC GHOST PAD
        # rtspsrc will create a src pad once it starts decoding, for now we initialize a ghost pad
        # to be linked to next element in the bin
        # We also add a callback on the signal "pad-added" to link the actual src pad when it is initialized
        rtspsrc.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
        rtspsrc.connect("pad-added", cls.rtspsrc_callback_newpad, rtpdepay)

        # ADD ELEMENTS TO THE BIN
        Gst.Bin.add(rtsp_source_bin, rtspsrc)
        Gst.Bin.add(rtsp_source_bin, rtpdepay)
        Gst.Bin.add(rtsp_source_bin, h26parse)
        Gst.Bin.add(rtsp_source_bin, decoder)
        # Gst.Bin.add(rtsp_source_bin, queue)
        Gst.Bin.add(rtsp_source_bin, nvvideoconvert)
        Gst.Bin.add(rtsp_source_bin, caps_filter)

        # LINK ELEMENTS
        rtspsrc.link(rtpdepay)
        rtpdepay.link(h26parse)
        h26parse.link(decoder)
        # nvv4l2decoder.link(queue)
        # queue.link(nvvideoconvert)
        decoder.link(nvvideoconvert)
        nvvideoconvert.link(caps_filter)

        # BIN GHOST PAD
        # We need to create a ghost pad for the rtsp source bin which will act as a src pad
        rtsp_source_bin.add_pad(
            Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
        )
        ghost_pad = rtsp_source_bin.get_static_pad("src")
        if not ghost_pad:
            logger.error(" Failed to add ghost pad in source bin \n")
            return None
        ghost_pad.set_target(caps_filter.get_static_pad("src"))
        return rtsp_source_bin

    @classmethod
    def rtspsrc_callback_newpad(cls, rtsp_src, rtsp_src_new_src_pad, data):
        logger.info(f"SourceBin: added pad {rtsp_src_new_src_pad.name} to {rtsp_src.name}")
        caps = rtsp_src_new_src_pad.get_current_caps()
        gststruct = caps.get_structure(0)
        # gstname = gststruct.get_name()
        # rtph264depay = data
        # features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not audio.
        # TODO: are we checking if this is a video in the correct way?
        if gststruct.get_string("media") == "video":
            # Get the source bin ghost pad
            rtsp_src_ghost_pad = rtsp_src.get_static_pad("src")
            logger.info(f"padcaps {rtsp_src_ghost_pad.get_current_caps()}")
            if not rtsp_src_ghost_pad.set_target(rtsp_src_new_src_pad):
                logger.error(
                    "Failed to link decoder src pad to source bin ghost pad\n"
                )

Take a look at the fallbacksrc element. That basically does what you’re looking for here. I write a bit more about it a while ago.

1 Like

Thanks! This may be a bit off topic here, but I’m not sure about something: I am currently using Deepstream 6.2 that uses GStreamer 1.16.3. It seems that fallbacksrc requires GStreamer >= 1.20.
I could upgrade my GStreamer to 1.20/1.22, however, I am using some Deepstream hardware accelerated video decoding elements (nvv4l2decoder).
Do you know if it’s possible to still utilize the Deepstream elements after upgrading to 1.20, or are there other options for hardware accelerated decoding available?

I’m aware of multiple people doing exactly that, so that shouldn’t be a problem.

1 Like

If trying to use DeepStream with a newer version of GStreamer does not work, would there be a different method other than the fallbacksrc element?
Given what is available in Gst 1.16, would it be a matter of listening for certain signals, then somehow dynamically re-linking things?

Yes, you can re-implement what fallbacksrc is doing. You’d need a GstBin subclass that catches error messages, and the errorignore element (or a custom GstGhostPad chain function) to ignore error flow returns.

1 Like