Some bother constructing a SRT to RTP relay dynamically

Hello hackers!

I am trying my luck with a pipeline that receives audio and/or video contained in mpeg-ts over SRT. The audio and video will then be relayed to via RTP using the rtpbin.

The idea is to handle different types of codecs (right now h.264/h.265 and aac) dynamically. So when pad appears on the tsdemux we check the caps and decide how to build rest of the pipeline.

demux.connect_pad_added(move |_, pad| {
    // When new pad appears:
    // 1. Check caps
    // 2. Create appropriate processing branch
    // 3. Link to rtpbin
});

And in the method called from the callback to link rtpbin we then connect to the pad-added of the rtpbin giving us a pipeline sort of like:

[srtsrc]
   |
[queue]
   |
[tsdemux]
  /     \  <-  on pad-added
  |       \--------
  V               V
[queue]        [queue]
   |              |
[capsfilter] [capsfilter]
   |              |
[parser]       [parser]
   |              |
[capsfilter] [capsfilter]
     |         ___|  
      \       / 
    [rtpbin]
   /   |   \  <- on pad-aded
  /    |    \
[udpsink_rtp1] [udpsink_rtcp1]
[udpsink_rtp2] [udpsink_rtcp2]

This works quite well … most of the time … I occasionally get errors saying the pipeline is not linked … and sometimes get strange freezes, so I think I am not being careful enough. I would like some guidence here …
Is all this fine todo in the pad-added callbacks or do I need to use blocking probes at some stage to make this safe? Or is it ok to wait for pad-added on rtpbin and have the pipeline flowing?

I also wonder when I add elements dynamically to this pipeline at what stages I need to do the sync_state_with_parent calls to make sure the pipeline is in playing …

The error in log I get from time to time is:

{}"timestamp":"2025-02-07T14:02:22.914727Z","level":"INFO","message":"New demux pad detected","pad":"audio_1_0042"}
{"timestamp":"2025-02-07T14:02:22.914816Z","level":"INFO","message":"Detected incoming media","srt_stream_id":"ingestion-node-static-1","caps":"Caps(audio/mpeg(memory:SystemMemory) { mpegversion: (gint) 4, stream-format: (gchararray) \"adts\" })"}
{"timestamp":"2025-02-07T14:02:22.914913Z","level":"INFO","message":"Identified incoming audio stream","srt_stream_id":"ingestion-node-static-1","codec":"Mp4aLatm"}
{"timestamp":"2025-02-07T14:02:22.915431Z","level":"INFO","message":"End of stream","srt_stream_id":"ingestion-node-static-1","span":{"stream":"ingestion-node-static-1","name":"pipeline"}}
0:00:31.255691907 915410 0x7f5ae4000f60 WARN                 basesrc gstbasesrc.c:3177:gst_base_src_loop:<src> error: Internal data stream error.
0:00:31.255735328 915410 0x7f5ae4000f60 WARN                 basesrc gstbasesrc.c:3177:gst_base_src_loop:<src> error: streaming stopped, reason not-linked (-1)
0:00:31.255858947 915410 0x7f5ae4000f60 WARN                   queue gstqueue.c:1040:gst_queue_handle_sink_event:<srt_queue> error: Internal data stream error.
0:00:31.255878583 915410 0x7f5ae4000f60 WARN                   queue gstqueue.c:1040:gst_queue_handle_sink_event:<srt_queue> error: streaming stopped, reason not-linked (-1)

All help and guidance welcome!
Jonas

Seems I did not handle the pad-removed of the tsdemux properly …