Hi gurus,
I have a shmsink
sending raw video with caps applied and able to connect multiple receivers to it to attempt decoupling gstreamer instances.
I’m wanting to keep the running pipeline alive when the shmsink
disappears which shmsrc
errors out. I’ve created a basic test switcher using input-select
which fits the bill. Tried with compositor
however, couldn’t get it working since the format is byte and not time. The newer rust fallsbackswitch
element suffered the same issue.
Below is my attempt using go with the go-gst/go-gst bindings. I’m creating a basic switch with one input videotestsrc black with a text overlay. When the shmsink fires up, I what for the shared memory file created and then attempt to build the shmsrc pipeline with buildShmSrc() and attach the pad to the input-select
element and switch to the shmsrc source. I’m using the identity
element to signal handoff monitoring the data flow and on the event the data flow stops wait 1 second then fallback to videotestsrc with switchToLostVideo() then, simply return to monitoring for the shmsink path to appear again.
I’m sure all of this has been done before however, I can’t seem to find any working example to port from C or python to fit my application.
Maybe it’s my newbie understand on using pads and links to add pipelines. I also tried uses bins and building and joining these to the running pipeline on the fly.
I’m struggling with the error below to which I can’t deref the sink_0 to re-use again?
Sending raw video and audio via shm maybe not be the best approach however, I tried a container format like matroska and had issues with the receiver not able to demux which connected on the run. The shm approach seems the logic solution other than encoding and decoding at each end on the local machine. the shm is local memory on the machine. I’m running Ubuntu 24.04 LTS with gstreamer 1.24.
go run .
/tmp/channel-1-video found. Attaching shmsrc on startup.
Building and attaching new shmsrc element.
(test:237683): GStreamer-CRITICAL **: 20:18:58.974: Element selector already has a pad named sink_0, the behaviour of gst_element_get_request_pad() for existing pads is undefined!
New shmsrc element attached.
Pipeline running. Press Ctrl+C to stop...
/tmp/channel-1-video does not exist. Removing shmsrc.
Removing shmsrc element from pipeline.
shmsrc element not found.
identity element not found.
Pad sink_0 released and unref'd.
Switching to Lost Video.
Failed to get Lost Video pad.
Switching to Lost Video.
Failed to get Lost Video pad.
Switching to Lost Video.
Failed to get Lost Video pad.
/tmp/channel-1-video exists. Building shmsrc.
Building and attaching new shmsrc element.
(test:237683): GStreamer-CRITICAL **: 20:19:15.986: Element selector already has a pad named sink_0, the behaviour of gst_element_get_request_pad() for existing pads is undefined!
New shmsrc element attached.
Here’s my code with heaps of debug info.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-gst/go-gst/gst"
)
var (
switchInterval = 1 * time.Second // Interval for monitoring shmsrc updates
fallback = false // Whether fallback is active
selector *gst.Element // Input selector element
pipeline *gst.Pipeline // GStreamer pipeline
sinkPad *gst.Pad // Pad for shmsrc
shmsrcLastUpdate = time.Now() // Timestamp of the last update from shmsrc
shmsrcAttached = false // Flag to track if shmsrc is attached
)
func main() {
// Initialize GStreamer
gst.Init(nil)
// Define the main pipeline string for the Lost Video stream
pipelineString := `
input-selector name=selector ! videoconvert ! autovideosink
videotestsrc pattern=black is-live=true ! queue ! textoverlay name=txtoverlay text="Lost Video" valignment=center halignment=center font-desc="Sans, 48" ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! selector.sink_1
`
// Create the main pipeline from the string
var err error
pipeline, err = gst.NewPipelineFromString(pipelineString)
if err != nil {
fmt.Println("Failed to create pipeline:", err)
os.Exit(1)
}
// Get a reference to the input-selector element
selector, err = pipeline.GetElementByName("selector")
if err != nil {
fmt.Println("Failed to get selector element by name:", err)
os.Exit(1)
}
// Check for the existence of /tmp/channel-1-video at startup
if _, err := os.Stat("/tmp/channel-1-video"); err == nil {
// If the file exists, build and attach shmsrc
fmt.Println("/tmp/channel-1-video found. Attaching shmsrc on startup.")
buildShmSrc()
} else {
// Otherwise, start with Lost Video
fmt.Println("/tmp/channel-1-video not found. Starting with Lost Video.")
switchToLostVideo()
}
// Handle signals to cleanly exit
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
// Start a routine to monitor the shmsink and switch between sources
go startShmSrcMonitor()
// Set the pipeline to the PLAYING state
pipeline.SetState(gst.StatePlaying)
fmt.Println("Pipeline running. Press Ctrl+C to stop...")
// Wait for a signal to stop
<-sigs
// Cleanup: Set the pipeline state to NULL
pipeline.SetState(gst.StateNull)
fmt.Println("Pipeline stopped.")
}
// buildShmSrc creates a new shmsrc element and connects it to the pipeline using NewElementMany
func buildShmSrc() {
fmt.Println("Building and attaching new shmsrc element.")
// Create the shmsrc and identity elements using NewElementMany
elements, err := gst.NewElementMany("shmsrc", "queue", "identity")
if err != nil {
fmt.Println("Failed to create shmsrc pipeline elements:", err)
return
}
shmsrc := elements[0]
identity := elements[2]
// Set properties for the shmsrc element
shmsrc.SetProperty("socket-path", "/tmp/channel-1-video")
shmsrc.SetProperty("is-live", true)
shmsrc.SetProperty("do-timestamp", true)
// Add the shmsrc elements to the pipeline
pipeline.AddMany(elements...)
// Link the elements together
shmsrc.Link(elements[1]) // shmsrc -> queue
elements[1].Link(identity) // queue -> identity
// Request a new pad if one doesn't already exist
if sinkPad == nil || sinkPad.IsLinked() {
sinkPad = selector.GetRequestPad("sink_0")
}
// Link the identity's src pad to the input-selector's sink_0 pad
identityPad := identity.GetStaticPad("src")
if identityPad != nil && sinkPad != nil {
if identityPad.Link(sinkPad) != gst.PadLinkOK {
fmt.Println("Failed to link identity src pad to selector sink_0 pad.")
}
} else {
fmt.Println("Failed to get sink_0 pad or identity src pad for selector.")
}
// Monitor data flow from the shmsrc
identity.Connect("handoff", func() {
shmsrcLastUpdate = time.Now() // Update the timestamp when data flows
if fallback {
// If in fallback, switch back to shmsrc when data is flowing
fmt.Println("Data flowing through shmsrc. Switching back to shmsrc.")
selector.SetProperty("active-pad", sinkPad)
fallback = false
}
})
// Sync the states of the new elements
for _, element := range elements {
element.SyncStateWithParent()
}
shmsrcAttached = true
fmt.Println("New shmsrc element attached.")
}
// removeShmSrc removes the shmsrc element from the pipeline
func removeShmSrc() {
if !shmsrcAttached {
return
}
fmt.Println("Removing shmsrc element from pipeline.")
shmsrc, _ := pipeline.GetElementByName("shmsrc")
if shmsrc != nil {
shmsrc.SetState(gst.StateNull)
pipeline.Remove(shmsrc)
fmt.Println("shmsrc element removed.")
} else {
fmt.Println("shmsrc element not found.")
}
identity, _ := pipeline.GetElementByName("identity")
if identity != nil {
identity.SetState(gst.StateNull)
pipeline.Remove(identity)
fmt.Println("identity element removed.")
} else {
fmt.Println("identity element not found.")
}
// Release the pad on selector if it exists
if sinkPad != nil && sinkPad.IsLinked() {
selector.ReleaseRequestPad(sinkPad)
sinkPad.Unref() // Unref the pad to ensure it's properly cleaned up
sinkPad = nil // Reset the reference to avoid reuse
fmt.Println("Pad sink_0 released and unref'd.")
} else {
fmt.Println("No sink_0 pad to release.")
}
shmsrcAttached = false
}
// startShmSrcMonitor monitors for the existence of the shmsink and switches between sources
func startShmSrcMonitor() {
for {
time.Sleep(switchInterval)
// Check if the shmsink file exists
if _, err := os.Stat("/tmp/channel-1-video"); err == nil {
// If shmsink is available and shmsrc is not attached, build and attach it
if !shmsrcAttached {
fmt.Println("/tmp/channel-1-video exists. Building shmsrc.")
buildShmSrc()
} else if time.Since(shmsrcLastUpdate) > 1*time.Second {
// If shmsrc is attached but no data has flowed for 1 second, remove it
fmt.Println("No data from shmsrc for 1 second. Switching to Lost Video.")
removeShmSrc()
switchToLostVideo()
}
} else {
// If shmsink is not available, ensure we are showing Lost Video
if shmsrcAttached {
fmt.Println("/tmp/channel-1-video does not exist. Removing shmsrc.")
removeShmSrc()
}
switchToLostVideo()
}
}
}
// switchToLostVideo switches the active pad to the Lost Video source
func switchToLostVideo() {
if !fallback {
fmt.Println("Switching to Lost Video.")
pad := selector.GetStaticPad("sink_1")
if pad != nil {
selector.SetProperty("active-pad", pad)
fallback = true
} else {
fmt.Println("Failed to get Lost Video pad.")
}
}
}