Hi i working on a project for live streaming video with background music but i fond a weird behavior with concat.
The behavior expected is when i load song 1.mp3 and song 2.ogg i expect 1.mp3 play before 2.ogg, but gstreamer plat first 2.ogg and than 1.mp3.
This is not expected from the concat that i expect to be a FIFO (First In First Iut) function is.
I don’t know this is a bug or the correct behavior for gstreamer.
ubuntu 25.04
GStreamer 1.26.0 (download from ubuntu repo)
The code that i use for my test:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
import sys
Gst.init(None)
YT_Key = "Youtube KEY"
YOUTUBE_URL = f"rtmp://a.rtmp.youtube.com/live2/{YT_Key}"
class ytstreamer:
def __init__(self):
audio_files = ['1.mp3','2.ogg']
path = "/home/user-name/avtest/testtracks"
self.pipeline = Gst.Pipeline.new("youtube_stream")
# Audio
self.concat = Gst.ElementFactory.make("concat", "concat")
self.pipeline.add(self.concat)
for i, file in enumerate(audio_files):
print(file)
fpath = f"file://{path}/{file}"
MusicPlayer(fpath, self.pipeline, self.concat)
self.audioconvert = Gst.ElementFactory.make("audioconvert", "audioconvert")
self.audioresample = Gst.ElementFactory.make("audioresample", "audioresample")
self.voaacenc = Gst.ElementFactory.make("voaacenc", "voaacenc")
# self.queue_audio = Gst.ElementFactory.make("queue", "queue_audio")
# Video
self.videotestsrc = Gst.ElementFactory.make("videotestsrc", "videotestsrc")
self.videotestsrc.set_property("is-live", True)
self.videotestsrc.set_property("pattern", 0) # 0 = SMPTE
self.videoconvert = Gst.ElementFactory.make("videoconvert", "videoconvert")
self.x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
self.x264enc.set_property("tune", "zerolatency")
self.x264enc.set_property("bitrate", 2000)
self.x264enc.set_property("speed-preset", "veryfast")
self.queue_video = Gst.ElementFactory.make("queue", "queue_video")
# FLV Mux
self.flvmux = Gst.ElementFactory.make("flvmux", "flvmux")
self.flvmux.set_property("streamable", True)
self.rtmpsink = Gst.ElementFactory.make("rtmpsink", "rtmpsink")
self.rtmpsink.set_property("location", YOUTUBE_URL)
for elem in [self.audioconvert, self.audioresample, self.voaacenc, #self.queue_audio,
self.videotestsrc, self.videoconvert, self.x264enc, self.queue_video,
self.flvmux, self.rtmpsink]:
self.pipeline.add(elem)
# Link audio path
self.concat.link(self.audioconvert)
self.audioconvert.link(self.audioresample)
self.audioresample.link(self.voaacenc)
# self.voaacenc.link(self.queue_audio)
# self.queue_audio.link(self.flvmux)
self.voaacenc.link(self.flvmux)
# Link video path
self.videotestsrc.link(self.videoconvert)
self.videoconvert.link(self.x264enc)
self.x264enc.link(self.queue_video)
self.queue_video.link(self.flvmux)
# Link mux to sink
self.flvmux.link(self.rtmpsink)
# Start pipeline
self.pipeline.set_state(Gst.State.PLAYING)
# Wait until error or EOS
bus = self.pipeline.get_bus()
msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE,
Gst.MessageType.ERROR | Gst.MessageType.EOS)
self.pipeline.set_state(Gst.State.NULL)
if msg.type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"Error: {err.message}")
sys.exit(1)
else:
print("Streaming finished.")
print("done")
def on_pad_added(self, decodebin, pad, target):
sinkpad = target.get_request_pad("sink_%u")
pad.link(sinkpad)
class MusicPlayer(object):
"""docstring for MusicPlayer."""
def __init__(self, uri, pipeline, concat):
super(MusicPlayer, self).__init__()
self.track = uri
self.pipeline = pipeline
self.concat = concat
self.decodebin = Gst.ElementFactory.make("uridecodebin", None)
self.decodebin.set_property("uri", uri)
self.decodebin.connect("pad-added", self.on_pad_added)
self.pipeline.add(self.decodebin)
self.decodebin.sync_state_with_parent()
def on_pad_added(self, decodebin, pad):
print(self.track)
caps = pad.query_caps(None)
name = caps.to_string()
if name.startswith("audio/"):
# Create a queue to avoid blocking
queue = Gst.ElementFactory.make("queue", None)
self.pipeline.add(queue)
queue.sync_state_with_parent()
sink_pad = self.concat.get_request_pad("sink_%u")
pad.link(queue.get_static_pad("sink"))
queue.get_static_pad("src").link(sink_pad)
if __name__ == '__main__':
print("run")
ytstreamer()