Switching between fallback source and live video through a sharedmemory using go

Hi gurus,

I have a shmsink sending raw video with caps applied and able to connect multiple receivers to it to attempt decoupling gstreamer instances.

I’m wanting to keep the running pipeline alive when the shmsink disappears which shmsrc errors out. I’ve created a basic test switcher using input-select which fits the bill. Tried with compositor however, couldn’t get it working since the format is byte and not time. The newer rust fallsbackswitch element suffered the same issue.

Below is my attempt using go with the go-gst/go-gst bindings. I’m creating a basic switch with one input videotestsrc black with a text overlay. When the shmsink fires up, I what for the shared memory file created and then attempt to build the shmsrc pipeline with buildShmSrc() and attach the pad to the input-select element and switch to the shmsrc source. I’m using the identity element to signal handoff monitoring the data flow and on the event the data flow stops wait 1 second then fallback to videotestsrc with switchToLostVideo() then, simply return to monitoring for the shmsink path to appear again.

I’m sure all of this has been done before however, I can’t seem to find any working example to port from C or python to fit my application.
Maybe it’s my newbie understand on using pads and links to add pipelines. I also tried uses bins and building and joining these to the running pipeline on the fly.

I’m struggling with the error below to which I can’t deref the sink_0 to re-use again?

Sending raw video and audio via shm maybe not be the best approach however, I tried a container format like matroska and had issues with the receiver not able to demux which connected on the run. The shm approach seems the logic solution other than encoding and decoding at each end on the local machine. the shm is local memory on the machine. I’m running Ubuntu 24.04 LTS with gstreamer 1.24.

go run .
/tmp/channel-1-video found. Attaching shmsrc on startup.
Building and attaching new shmsrc element.
(test:237683): GStreamer-CRITICAL **: 20:18:58.974: Element selector already has a pad named sink_0, the behaviour of  gst_element_get_request_pad() for existing pads is undefined!
New shmsrc element attached.
Pipeline running. Press Ctrl+C to stop...
/tmp/channel-1-video does not exist. Removing shmsrc.
Removing shmsrc element from pipeline.
shmsrc element not found.
identity element not found.
Pad sink_0 released and unref'd.
Switching to Lost Video.
Failed to get Lost Video pad.
Switching to Lost Video.
Failed to get Lost Video pad.
Switching to Lost Video.
Failed to get Lost Video pad.
/tmp/channel-1-video exists. Building shmsrc.
Building and attaching new shmsrc element.

(test:237683): GStreamer-CRITICAL **: 20:19:15.986: Element selector already has a pad named sink_0, the behaviour of  gst_element_get_request_pad() for existing pads is undefined!
New shmsrc element attached.

Here’s my code with heaps of debug info.

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/go-gst/go-gst/gst"
)

var (
	switchInterval   = 1 * time.Second // Interval for monitoring shmsrc updates
	fallback         = false           // Whether fallback is active
	selector         *gst.Element      // Input selector element
	pipeline         *gst.Pipeline     // GStreamer pipeline
	sinkPad          *gst.Pad          // Pad for shmsrc
	shmsrcLastUpdate = time.Now()      // Timestamp of the last update from shmsrc
	shmsrcAttached   = false           // Flag to track if shmsrc is attached
)

func main() {
	// Initialize GStreamer
	gst.Init(nil)

	// Define the main pipeline string for the Lost Video stream
	pipelineString := `
		input-selector name=selector ! videoconvert ! autovideosink
		videotestsrc pattern=black is-live=true ! queue ! textoverlay name=txtoverlay text="Lost Video" valignment=center halignment=center font-desc="Sans, 48" ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! selector.sink_1
	`

	// Create the main pipeline from the string
	var err error
	pipeline, err = gst.NewPipelineFromString(pipelineString)
	if err != nil {
		fmt.Println("Failed to create pipeline:", err)
		os.Exit(1)
	}

	// Get a reference to the input-selector element
	selector, err = pipeline.GetElementByName("selector")
	if err != nil {
		fmt.Println("Failed to get selector element by name:", err)
		os.Exit(1)
	}

	// Check for the existence of /tmp/channel-1-video at startup
	if _, err := os.Stat("/tmp/channel-1-video"); err == nil {
		// If the file exists, build and attach shmsrc
		fmt.Println("/tmp/channel-1-video found. Attaching shmsrc on startup.")
		buildShmSrc()
	} else {
		// Otherwise, start with Lost Video
		fmt.Println("/tmp/channel-1-video not found. Starting with Lost Video.")
		switchToLostVideo()
	}

	// Handle signals to cleanly exit
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	// Start a routine to monitor the shmsink and switch between sources
	go startShmSrcMonitor()

	// Set the pipeline to the PLAYING state
	pipeline.SetState(gst.StatePlaying)

	fmt.Println("Pipeline running. Press Ctrl+C to stop...")

	// Wait for a signal to stop
	<-sigs

	// Cleanup: Set the pipeline state to NULL
	pipeline.SetState(gst.StateNull)
	fmt.Println("Pipeline stopped.")
}

// buildShmSrc creates a new shmsrc element and connects it to the pipeline using NewElementMany
func buildShmSrc() {
	fmt.Println("Building and attaching new shmsrc element.")

	// Create the shmsrc and identity elements using NewElementMany
	elements, err := gst.NewElementMany("shmsrc", "queue", "identity")
	if err != nil {
		fmt.Println("Failed to create shmsrc pipeline elements:", err)
		return
	}

	shmsrc := elements[0]
	identity := elements[2]

	// Set properties for the shmsrc element
	shmsrc.SetProperty("socket-path", "/tmp/channel-1-video")
	shmsrc.SetProperty("is-live", true)
	shmsrc.SetProperty("do-timestamp", true)

	// Add the shmsrc elements to the pipeline
	pipeline.AddMany(elements...)

	// Link the elements together
	shmsrc.Link(elements[1])   // shmsrc -> queue
	elements[1].Link(identity) // queue -> identity

	// Request a new pad if one doesn't already exist
	if sinkPad == nil || sinkPad.IsLinked() {
		sinkPad = selector.GetRequestPad("sink_0")
	}

	// Link the identity's src pad to the input-selector's sink_0 pad
	identityPad := identity.GetStaticPad("src")
	if identityPad != nil && sinkPad != nil {
		if identityPad.Link(sinkPad) != gst.PadLinkOK {
			fmt.Println("Failed to link identity src pad to selector sink_0 pad.")
		}
	} else {
		fmt.Println("Failed to get sink_0 pad or identity src pad for selector.")
	}

	// Monitor data flow from the shmsrc
	identity.Connect("handoff", func() {
		shmsrcLastUpdate = time.Now() // Update the timestamp when data flows
		if fallback {
			// If in fallback, switch back to shmsrc when data is flowing
			fmt.Println("Data flowing through shmsrc. Switching back to shmsrc.")
			selector.SetProperty("active-pad", sinkPad)
			fallback = false
		}
	})

	// Sync the states of the new elements
	for _, element := range elements {
		element.SyncStateWithParent()
	}

	shmsrcAttached = true
	fmt.Println("New shmsrc element attached.")
}

// removeShmSrc removes the shmsrc element from the pipeline
func removeShmSrc() {
	if !shmsrcAttached {
		return
	}
	fmt.Println("Removing shmsrc element from pipeline.")
	shmsrc, _ := pipeline.GetElementByName("shmsrc")
	if shmsrc != nil {
		shmsrc.SetState(gst.StateNull)
		pipeline.Remove(shmsrc)
		fmt.Println("shmsrc element removed.")
	} else {
		fmt.Println("shmsrc element not found.")
	}

	identity, _ := pipeline.GetElementByName("identity")
	if identity != nil {
		identity.SetState(gst.StateNull)
		pipeline.Remove(identity)
		fmt.Println("identity element removed.")
	} else {
		fmt.Println("identity element not found.")
	}

	// Release the pad on selector if it exists
	if sinkPad != nil && sinkPad.IsLinked() {
		selector.ReleaseRequestPad(sinkPad)
		sinkPad.Unref() // Unref the pad to ensure it's properly cleaned up
		sinkPad = nil   // Reset the reference to avoid reuse
		fmt.Println("Pad sink_0 released and unref'd.")
	} else {
		fmt.Println("No sink_0 pad to release.")
	}

	shmsrcAttached = false
}

// startShmSrcMonitor monitors for the existence of the shmsink and switches between sources
func startShmSrcMonitor() {
	for {
		time.Sleep(switchInterval)

		// Check if the shmsink file exists
		if _, err := os.Stat("/tmp/channel-1-video"); err == nil {
			// If shmsink is available and shmsrc is not attached, build and attach it
			if !shmsrcAttached {
				fmt.Println("/tmp/channel-1-video exists. Building shmsrc.")
				buildShmSrc()
			} else if time.Since(shmsrcLastUpdate) > 1*time.Second {
				// If shmsrc is attached but no data has flowed for 1 second, remove it
				fmt.Println("No data from shmsrc for 1 second. Switching to Lost Video.")
				removeShmSrc()
				switchToLostVideo()
			}
		} else {
			// If shmsink is not available, ensure we are showing Lost Video
			if shmsrcAttached {
				fmt.Println("/tmp/channel-1-video does not exist. Removing shmsrc.")
				removeShmSrc()
			}
			switchToLostVideo()
		}
	}
}

// switchToLostVideo switches the active pad to the Lost Video source
func switchToLostVideo() {
	if !fallback {
		fmt.Println("Switching to Lost Video.")
		pad := selector.GetStaticPad("sink_1")
		if pad != nil {
			selector.SetProperty("active-pad", pad)
			fallback = true
		} else {
			fmt.Println("Failed to get Lost Video pad.")
		}
	}
}