Hi my Gstreamer friends,
I’m wanting to create a pipeline which never stalls and work as a stand-alone application. My shmsink source is raw video.
I need help on working what I’m not doing right.
(audio part yet to be implemented)
Basically I want is to use video and audio shmsink to have 1-n instances receiving video and audio from a one application (switcher) I’m wanting to make these backend robust to not die when the shmsink falls or the switcher app is not running or restarting.
At first mt backend apps await for the shm file to appear in /tmp then build the shmsrc bin and attach to the running pipeline using input-selector.
At start-up the input-selector shows the fallbacksource which is a simple videotestsrc. When the shmsink path appears (we have video) I’m able to build and attach the bin and begin showing the shmscr video source.
Unfortunately, when a close the shmsink the last frame is shown frozen and I’m unable to switch to the fallback and detach the broken shmsrc (destroy it) and repeat the process awaiting for the shmsink path to appear.
Has anybody successfully created such a program and if so, what is the process in adding and detaching ghostpads to pipelines?
I tried fallbackswitch however, couldn’t get it working with raw video. I would have thought fallbackswitch to be the best solution.
Google shows hardly any examples in using fallbackswitch with raw video or audio? Only slowmo’s blog is the only idea and some topics here on discourse.
Here’s my code using Go…
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-gst/go-gst/gst"
)
var (
fallbackActivated sync.Once
socketPath = "/tmp/channel-1-video"
shmsrcBin *gst.Element // Track shmsrcBin directly
)
func main() {
// Initialize GStreamer
gst.Init(nil)
// Define pipeline with input-selector and videotestsrc fallback
pipelineStr := `
input-selector name=selector
videotestsrc pattern=black is-live=true
! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1,interlace-mode=progressive
! textoverlay text="Waiting for video stream" font-desc="Sans, 30" halignment=center valignment=center
! selector.
selector. ! videoconvert ! autovideosink`
// Create the main pipeline
pipeline, err := gst.NewPipelineFromString(pipelineStr)
if err != nil {
fmt.Printf("Failed to create pipeline: %v\n", err)
return
}
// Retrieve the input-selector element by name
selector, err := pipeline.GetElementByName("selector")
if err != nil {
fmt.Printf("Failed to get input-selector: %v\n", err)
return
}
// Start the pipeline with fallback video
pipeline.SetState(gst.StatePlaying)
// Handle graceful shutdown with context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Capture SIGINT and SIGTERM to trigger graceful shutdown
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
cancel()
}()
// Watch the bus for EOS and errors
go watchBus(ctx, pipeline, selector)
// Start monitoring the socket path for changes
go monitorSocket(ctx, selector, pipeline)
// Block until context is canceled (e.g., on Ctrl+C)
<-ctx.Done()
fmt.Println("Shutting down...")
pipeline.SetState(gst.StateNull)
}
// createShmSrcBin creates a new shmsrc bin and returns it as *gst.Element
func createShmSrcBin() *gst.Element {
binStr := fmt.Sprintf(`
shmsrc socket-path=%s do-timestamp=true
! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1,interlace-mode=progressive
! queue max-size-bytes=20971520 max-size-time=0
`, socketPath)
// Create the bin from the string
shmsrcBin, err := gst.NewBinFromString(binStr, true)
if err != nil {
log.Fatalf("Failed to create shmsrc bin: %v", err)
}
// Set the shmsrc bin to READY state
shmsrcBin.SetState(gst.StateReady)
shmsrcBin.SyncStateWithParent()
return shmsrcBin.Element
}
// linkShmSrcToSelector links shmsrcBin's ghost pad to the input-selector
func linkShmSrcToSelector(selector *gst.Element, shmsrcBin *gst.Element) {
ghostPad := shmsrcBin.GetStaticPad("src")
selectorSinkPad := selector.GetRequestPad("sink_%u")
ghostPad.Link(selectorSinkPad)
}
// switchToFallback forcefully activates videotestsrc as the fallback
func switchToFallback(selector *gst.Element) {
fmt.Println("Switching to fallback videotestsrc...")
fallbackActivated.Do(func() {
pads, err := selector.GetSinkPads()
if err != nil {
fmt.Printf("Failed to get selector sink pads %v", err)
return
}
if len(pads) < 2 {
fmt.Println("Error: Not enough pads to switch input-selector.")
return
}
fallbackPad := pads[1]
err = selector.SetProperty("active-pad", fallbackPad)
if err != nil {
fmt.Printf("Failed to switch to fallback pad: %v\n", err)
}
})
}
// watchBus monitors the bus for EOS and errors, removing shmsrc and switching to fallback on these events
func watchBus(ctx context.Context, pipeline *gst.Pipeline, selector *gst.Element) {
bus := pipeline.GetBus()
defer bus.Unref()
for {
select {
case <-ctx.Done():
fmt.Println("Stopping bus monitoring...")
return
default:
msg := bus.PopFiltered(gst.MessageError | gst.MessageEOS)
if msg != nil {
switch msg.Type() {
case gst.MessageEOS:
fmt.Println("End-of-Stream reached.")
removeShmSrcAndFallback(pipeline, selector)
case gst.MessageError:
gerr := msg.ParseError()
fmt.Printf("Error: %s\nDebug Info: %s\n", gerr.Error(), gerr.DebugString())
removeShmSrcAndFallback(pipeline, selector)
}
}
}
time.Sleep(10 * time.Millisecond) // Small sleep to reduce CPU usage
}
}
// monitorSocket checks for the presence of the socket and manages shmsrc accordingly
func monitorSocket(ctx context.Context, selector *gst.Element, pipeline *gst.Pipeline) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// Handle shutdown signal
fmt.Println("Stopping socket monitoring...")
if shmsrcBin != nil {
shmsrcBin.SetState(gst.StateNull)
shmsrcBin.SyncStateWithParent()
pipeline.Remove(shmsrcBin)
}
return
case <-ticker.C:
// Check if the socket is available
if _, err := os.Stat(socketPath); err == nil {
if shmsrcBin == nil {
fmt.Println("Socket detected, creating and attaching shmsrc...")
// Reset fallback flag to allow reactivation
fallbackActivated = sync.Once{}
// Create a new shmsrc bin and attach it
shmsrcBin = createShmSrcBin()
pipeline.Add(shmsrcBin)
linkShmSrcToSelector(selector, shmsrcBin)
shmsrcBin.SetState(gst.StatePlaying)
// Switch to shmsrc input on selector
switchToShmSrc(selector)
}
} else if shmsrcBin != nil {
fmt.Println("Socket not available, detaching shmsrc...")
// Fully detach and clean up shmsrc
removeShmSrcAndFallback(pipeline, selector)
shmsrcBin = nil
}
}
}
}
// removeShmSrcAndFallback removes shmsrc and switches to fallback
func removeShmSrcAndFallback(pipeline *gst.Pipeline, selector *gst.Element) {
if shmsrcBin != nil {
fmt.Println("Removing shmsrc and switching to fallback.")
shmsrcBin.SetState(gst.StateNull)
shmsrcBin.SyncStateWithParent()
pipeline.Remove(shmsrcBin)
shmsrcBin = nil
}
// Forcefully switch to fallback
switchToFallback(selector)
}
// switchToShmSrc switches the input-selector to use the shmsrcBin
func switchToShmSrc(selector *gst.Element) {
fmt.Println("Switching to shmsrc input...")
pads, err := selector.GetSinkPads()
if err != nil {
fmt.Printf("Failed to get selector sink pads %v", err)
return
}
// Get the shmsrc bin pad (assumes shmsrc is added last)
shmPad := pads[len(pads)-1]
err = selector.SetProperty("active-pad", shmPad)
if err != nil {
fmt.Printf("Failed to switch to shmsrc pad: %v\n", err)
}
}
Here’s my debug info
dev@dev:~/repeater/dvb-broadcast/fallback$ go run .
Socket detected, creating and attaching shmsrc...
Switching to shmsrc input...
Error: Failed to read from shmsrc
Debug Info: ../subprojects/gst-plugins-bad/sys/shm/gstshmsrc.c(356): gst_shm_src_create (): /GstPipeline:pipeline0/GstBin:bin0/GstShmSrc:shmsrc0:
Control socket has closed
Removing shmsrc and switching to fallback.
Socket not available, detaching shmsrc...
Removing shmsrc and switching to fallback.
^CShutting down...
And shmsink if you want to recreate
gst-launch-1.0 \
videotestsrc is-live=true ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! shmsink wait-for-connection=0 sync=true socket-path=/tmp/channel-1-video shm-size=$(expr 1920 \* 1080 \* 4 \* 22) sync=true \
audiotestsrc is-live=true ! audio/x-raw,format=S16LE,channels=2,rate=48000,layout=interleaved ! shmsink wait-for-connection=0 sync=true socket-path=/tmp/channel-1-audio shm-size=$(expr 48000 \* 2 \* 2) sync=true