Help Please, create a fallback for shmsrc to make resilence in receiver app

Hi my Gstreamer friends,

I’m wanting to create a pipeline which never stalls and work as a stand-alone application. My shmsink source is raw video.

I need help on working what I’m not doing right.

(audio part yet to be implemented)

Basically I want is to use video and audio shmsink to have 1-n instances receiving video and audio from a one application (switcher) I’m wanting to make these backend robust to not die when the shmsink falls or the switcher app is not running or restarting.

At first mt backend apps await for the shm file to appear in /tmp then build the shmsrc bin and attach to the running pipeline using input-selector.
At start-up the input-selector shows the fallbacksource which is a simple videotestsrc. When the shmsink path appears (we have video) I’m able to build and attach the bin and begin showing the shmscr video source.

Unfortunately, when a close the shmsink the last frame is shown frozen and I’m unable to switch to the fallback and detach the broken shmsrc (destroy it) and repeat the process awaiting for the shmsink path to appear.

Has anybody successfully created such a program and if so, what is the process in adding and detaching ghostpads to pipelines?

I tried fallbackswitch however, couldn’t get it working with raw video. I would have thought fallbackswitch to be the best solution.

Google shows hardly any examples in using fallbackswitch with raw video or audio? Only slowmo’s blog is the only idea and some topics here on discourse.

Here’s my code using Go…

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/go-gst/go-gst/gst"
)

var (
	fallbackActivated sync.Once
	socketPath        = "/tmp/channel-1-video"
	shmsrcBin         *gst.Element // Track shmsrcBin directly
)

func main() {
	// Initialize GStreamer
	gst.Init(nil)

	// Define pipeline with input-selector and videotestsrc fallback
	pipelineStr := `
        input-selector name=selector
        videotestsrc pattern=black is-live=true
        ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1,interlace-mode=progressive
        ! textoverlay text="Waiting for video stream" font-desc="Sans, 30" halignment=center valignment=center
        ! selector.
        selector. ! videoconvert ! autovideosink`

	// Create the main pipeline
	pipeline, err := gst.NewPipelineFromString(pipelineStr)
	if err != nil {
		fmt.Printf("Failed to create pipeline: %v\n", err)
		return
	}

	// Retrieve the input-selector element by name
	selector, err := pipeline.GetElementByName("selector")
	if err != nil {
		fmt.Printf("Failed to get input-selector: %v\n", err)
		return
	}

	// Start the pipeline with fallback video
	pipeline.SetState(gst.StatePlaying)

	// Handle graceful shutdown with context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Capture SIGINT and SIGTERM to trigger graceful shutdown
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		cancel()
	}()

	// Watch the bus for EOS and errors
	go watchBus(ctx, pipeline, selector)

	// Start monitoring the socket path for changes
	go monitorSocket(ctx, selector, pipeline)

	// Block until context is canceled (e.g., on Ctrl+C)
	<-ctx.Done()
	fmt.Println("Shutting down...")
	pipeline.SetState(gst.StateNull)
}

// createShmSrcBin creates a new shmsrc bin and returns it as *gst.Element
func createShmSrcBin() *gst.Element {
	binStr := fmt.Sprintf(`
        shmsrc socket-path=%s do-timestamp=true
        ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1,interlace-mode=progressive
        ! queue max-size-bytes=20971520 max-size-time=0
    `, socketPath)

	// Create the bin from the string
	shmsrcBin, err := gst.NewBinFromString(binStr, true)
	if err != nil {
		log.Fatalf("Failed to create shmsrc bin: %v", err)
	}

	// Set the shmsrc bin to READY state
	shmsrcBin.SetState(gst.StateReady)
	shmsrcBin.SyncStateWithParent()

	return shmsrcBin.Element
}

// linkShmSrcToSelector links shmsrcBin's ghost pad to the input-selector
func linkShmSrcToSelector(selector *gst.Element, shmsrcBin *gst.Element) {
	ghostPad := shmsrcBin.GetStaticPad("src")
	selectorSinkPad := selector.GetRequestPad("sink_%u")
	ghostPad.Link(selectorSinkPad)
}

// switchToFallback forcefully activates videotestsrc as the fallback
func switchToFallback(selector *gst.Element) {
	fmt.Println("Switching to fallback videotestsrc...")
	fallbackActivated.Do(func() {
		pads, err := selector.GetSinkPads()
		if err != nil {
			fmt.Printf("Failed to get selector sink pads %v", err)
			return
		}

		if len(pads) < 2 {
			fmt.Println("Error: Not enough pads to switch input-selector.")
			return
		}

		fallbackPad := pads[1]
		err = selector.SetProperty("active-pad", fallbackPad)
		if err != nil {
			fmt.Printf("Failed to switch to fallback pad: %v\n", err)
		}
	})
}

// watchBus monitors the bus for EOS and errors, removing shmsrc and switching to fallback on these events
func watchBus(ctx context.Context, pipeline *gst.Pipeline, selector *gst.Element) {
	bus := pipeline.GetBus()
	defer bus.Unref()

	for {
		select {
		case <-ctx.Done():
			fmt.Println("Stopping bus monitoring...")
			return
		default:
			msg := bus.PopFiltered(gst.MessageError | gst.MessageEOS)
			if msg != nil {
				switch msg.Type() {
				case gst.MessageEOS:
					fmt.Println("End-of-Stream reached.")
					removeShmSrcAndFallback(pipeline, selector)
				case gst.MessageError:
					gerr := msg.ParseError()
					fmt.Printf("Error: %s\nDebug Info: %s\n", gerr.Error(), gerr.DebugString())
					removeShmSrcAndFallback(pipeline, selector)
				}
			}
		}
		time.Sleep(10 * time.Millisecond) // Small sleep to reduce CPU usage
	}
}

// monitorSocket checks for the presence of the socket and manages shmsrc accordingly
func monitorSocket(ctx context.Context, selector *gst.Element, pipeline *gst.Pipeline) {
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			// Handle shutdown signal
			fmt.Println("Stopping socket monitoring...")
			if shmsrcBin != nil {
				shmsrcBin.SetState(gst.StateNull)
				shmsrcBin.SyncStateWithParent()
				pipeline.Remove(shmsrcBin)
			}
			return

		case <-ticker.C:
			// Check if the socket is available
			if _, err := os.Stat(socketPath); err == nil {
				if shmsrcBin == nil {
					fmt.Println("Socket detected, creating and attaching shmsrc...")

					// Reset fallback flag to allow reactivation
					fallbackActivated = sync.Once{}

					// Create a new shmsrc bin and attach it
					shmsrcBin = createShmSrcBin()
					pipeline.Add(shmsrcBin)
					linkShmSrcToSelector(selector, shmsrcBin)
					shmsrcBin.SetState(gst.StatePlaying)

					// Switch to shmsrc input on selector
					switchToShmSrc(selector)
				}
			} else if shmsrcBin != nil {
				fmt.Println("Socket not available, detaching shmsrc...")

				// Fully detach and clean up shmsrc
				removeShmSrcAndFallback(pipeline, selector)
				shmsrcBin = nil
			}
		}
	}
}

// removeShmSrcAndFallback removes shmsrc and switches to fallback
func removeShmSrcAndFallback(pipeline *gst.Pipeline, selector *gst.Element) {
	if shmsrcBin != nil {
		fmt.Println("Removing shmsrc and switching to fallback.")
		shmsrcBin.SetState(gst.StateNull)
		shmsrcBin.SyncStateWithParent()
		pipeline.Remove(shmsrcBin)
		shmsrcBin = nil
	}

	// Forcefully switch to fallback
	switchToFallback(selector)
}

// switchToShmSrc switches the input-selector to use the shmsrcBin
func switchToShmSrc(selector *gst.Element) {
	fmt.Println("Switching to shmsrc input...")
	pads, err := selector.GetSinkPads()
	if err != nil {
		fmt.Printf("Failed to get selector sink pads %v", err)
		return
	}

	// Get the shmsrc bin pad (assumes shmsrc is added last)
	shmPad := pads[len(pads)-1]
	err = selector.SetProperty("active-pad", shmPad)
	if err != nil {
		fmt.Printf("Failed to switch to shmsrc pad: %v\n", err)
	}
}

Here’s my debug info


dev@dev:~/repeater/dvb-broadcast/fallback$ go run .
Socket detected, creating and attaching shmsrc...
Switching to shmsrc input...
Error: Failed to read from shmsrc
Debug Info: ../subprojects/gst-plugins-bad/sys/shm/gstshmsrc.c(356): gst_shm_src_create (): /GstPipeline:pipeline0/GstBin:bin0/GstShmSrc:shmsrc0:
Control socket has closed
Removing shmsrc and switching to fallback.
Socket not available, detaching shmsrc...
Removing shmsrc and switching to fallback.
^CShutting down...

And shmsink if you want to recreate

gst-launch-1.0 \
videotestsrc is-live=true ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! shmsink wait-for-connection=0 sync=true socket-path=/tmp/channel-1-video shm-size=$(expr 1920 \* 1080 \* 4 \* 22) sync=true \
audiotestsrc is-live=true ! audio/x-raw,format=S16LE,channels=2,rate=48000,layout=interleaved ! shmsink wait-for-connection=0 sync=true socket-path=/tmp/channel-1-audio shm-size=$(expr 48000 \* 2 \* 2) sync=true

Nobody here working with Gstreamer in Go?