Wierd behaver with concat

Hi i working on a project for live streaming video with background music but i fond a weird behavior with concat.

The behavior expected is when i load song 1.mp3 and song 2.ogg i expect 1.mp3 play before 2.ogg, but gstreamer plat first 2.ogg and than 1.mp3.

This is not expected from the concat that i expect to be a FIFO (First In First Iut) function is.

I don’t know this is a bug or the correct behavior for gstreamer.

ubuntu 25.04
GStreamer 1.26.0 (download from ubuntu repo)

The code that i use for my test:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
import sys

Gst.init(None)

YT_Key = "Youtube KEY"

YOUTUBE_URL = f"rtmp://a.rtmp.youtube.com/live2/{YT_Key}" 

class ytstreamer:
    def __init__(self):
        audio_files = ['1.mp3','2.ogg']
        path = "/home/user-name/avtest/testtracks"

        self.pipeline = Gst.Pipeline.new("youtube_stream")



        # Audio
        self.concat = Gst.ElementFactory.make("concat", "concat")
        self.pipeline.add(self.concat)

        for i, file in enumerate(audio_files):
            print(file)
            fpath = f"file://{path}/{file}"
            MusicPlayer(fpath, self.pipeline, self.concat)



        self.audioconvert = Gst.ElementFactory.make("audioconvert", "audioconvert")
        self.audioresample = Gst.ElementFactory.make("audioresample", "audioresample")
        self.voaacenc = Gst.ElementFactory.make("voaacenc", "voaacenc")
        # self.queue_audio = Gst.ElementFactory.make("queue", "queue_audio")

        # Video

        self.videotestsrc = Gst.ElementFactory.make("videotestsrc", "videotestsrc")
        self.videotestsrc.set_property("is-live", True)
        self.videotestsrc.set_property("pattern", 0)  # 0 = SMPTE

        self.videoconvert = Gst.ElementFactory.make("videoconvert", "videoconvert")
        self.x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
        self.x264enc.set_property("tune", "zerolatency")
        self.x264enc.set_property("bitrate", 2000)
        self.x264enc.set_property("speed-preset", "veryfast")
        self.queue_video = Gst.ElementFactory.make("queue", "queue_video")

        # FLV Mux
        self.flvmux = Gst.ElementFactory.make("flvmux", "flvmux")
        self.flvmux.set_property("streamable", True)

        self.rtmpsink = Gst.ElementFactory.make("rtmpsink", "rtmpsink")
        self.rtmpsink.set_property("location", YOUTUBE_URL)

        for elem in [self.audioconvert, self.audioresample, self.voaacenc, #self.queue_audio,
                self.videotestsrc, self.videoconvert, self.x264enc, self.queue_video,
                self.flvmux, self.rtmpsink]:
            self.pipeline.add(elem)

        # Link audio path
        self.concat.link(self.audioconvert)
        self.audioconvert.link(self.audioresample)
        self.audioresample.link(self.voaacenc)
        # self.voaacenc.link(self.queue_audio)
        # self.queue_audio.link(self.flvmux)
        self.voaacenc.link(self.flvmux)

        # Link video path
        self.videotestsrc.link(self.videoconvert)
        self.videoconvert.link(self.x264enc)
        self.x264enc.link(self.queue_video)
        self.queue_video.link(self.flvmux)

        # Link mux to sink
        self.flvmux.link(self.rtmpsink)

        # Start pipeline
        self.pipeline.set_state(Gst.State.PLAYING)

        # Wait until error or EOS
        bus = self.pipeline.get_bus()
        msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE,
            Gst.MessageType.ERROR | Gst.MessageType.EOS)

        self.pipeline.set_state(Gst.State.NULL)

        if msg.type == Gst.MessageType.ERROR:
            err, debug = msg.parse_error()
            print(f"Error: {err.message}")
            sys.exit(1)
        else:
            print("Streaming finished.")
        print("done")


    def on_pad_added(self, decodebin, pad, target):
        sinkpad = target.get_request_pad("sink_%u")
        pad.link(sinkpad)

class MusicPlayer(object):
    """docstring for MusicPlayer."""

    def __init__(self, uri, pipeline, concat):
        super(MusicPlayer, self).__init__()
        self.track = uri
        self.pipeline = pipeline
        self.concat = concat
        self.decodebin = Gst.ElementFactory.make("uridecodebin", None)
        self.decodebin.set_property("uri", uri)
        self.decodebin.connect("pad-added", self.on_pad_added)
        self.pipeline.add(self.decodebin)
        self.decodebin.sync_state_with_parent()


    def on_pad_added(self, decodebin, pad):
        print(self.track)
        caps = pad.query_caps(None)
        name = caps.to_string()
        if name.startswith("audio/"):
            # Create a queue to avoid blocking
            queue = Gst.ElementFactory.make("queue", None)
            self.pipeline.add(queue)
            queue.sync_state_with_parent()

            sink_pad = self.concat.get_request_pad("sink_%u")
            pad.link(queue.get_static_pad("sink"))
            queue.get_static_pad("src").link(sink_pad)


if __name__ == '__main__':
    print("run")
    ytstreamer()

Could it just be racy?

You’re only creating the concat sink pads when you get the pad-added signal from the respective uridecodebin, so it may depend on how quickly these happen in each case.

You could pass the file index to the MusicPlayer so that it can request an input pad f"sink_{idx}" according to the order in the list, instead of getting concat to give you the next one.