Record video from data received from appsrc (shared memory)

Hi everyone,
I’m using gstreamer bindings for Golang to record MP4 videos from frame data I receive from shared memory. The data I receive is constantly delivered as a byte array. This data is managed with appsrc. When I run the app, after the first 1.5 seconds, I start getting the error:

GStreamer-CRITICAL **: 11:52:23.352: gst_segment_to_running_time: assertion ‘segment->format == format’ failed

When the execution completes, I get an unplayable MP4 file that’s always 851 bytes in size. Any ideas on how to record MP4 files? Or what could be wrong with my code? From shared memory, I recieve raw data, UYVY format, framerate 30000/1001 and 1080i5994 resolution, and I need to compress it with the format below.

My code is:

package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"time"

	"github.com/gen2brain/shm"
	"github.com/go-gst/go-glib/glib"
	"github.com/go-gst/go-gst/examples"
	"github.com/go-gst/go-gst/gst"
	"github.com/go-gst/go-gst/gst/app"
)

const (
	videoShmKey = 1111
	headerSize  = 24 // 6 * 4 bytes
	numSlots    = 10
)

const fpsNum = 30000
const fpsDen = 1001

var frameDuration = time.Second * time.Duration(fpsDen) / time.Duration(fpsNum)

var (
	slotActual = 0
	dataVideo  []byte
)

func main() {
	examples.RunLoop(func(loop *glib.MainLoop) error {
		var pipeline *gst.Pipeline
		var err error
		if pipeline, err = createPipeline(); err != nil {
			return err
		}
		return mainLoop(loop, pipeline)
	})
}

func createPipeline() (*gst.Pipeline, error) {
	gst.Init(nil)

	pipeline, err := gst.NewPipeline("")
	if err != nil {
		return nil, err
	}

	appsrc, _ := gst.NewElement("appsrc")
	videoconvert, _ := gst.NewElement("videoconvert")
	x264enc, _ := gst.NewElement("x264enc")
	mp4mux, _ := gst.NewElement("mp4mux")
	filesink, _ := gst.NewElement("filesink")

	// Config  H.264 codec
	x264enc.SetProperty("bframes", 0)
	x264enc.SetProperty("speed-preset", "veryfast")
	x264enc.SetProperty("key-int-max", 60)

	// Config destiny file
	filesink.SetProperty("location", "grabacion.mp4")

	// Add and link elements
	pipeline.AddMany(appsrc, videoconvert, x264enc, mp4mux, filesink)
	gst.ElementLinkMany(appsrc, videoconvert, x264enc, mp4mux, filesink)

	// Config appsrc
	src := app.SrcFromElement(appsrc)
	src.SetCaps(gst.NewCapsFromString("video/x-raw, format=UYVY, width=1920, height=1080, layout=interleaved, framerate=30000/1001"))
	src.SetProperty("format", gst.FormatTime)
	src.SetProperty("stream-type", 0)
	err_tmsp := src.SetProperty("do-timestamp", true)
	if err_tmsp != nil {
		log.Printf("Error setting do-timestamp: %v", err_tmsp)
	}
	src.SetProperty("is-live", true)

	// Conect to shared memory
	log.Println("Conectando a memoria compartida...")
	shmidVideo, err := shm.Get(videoShmKey, 0, 0666)
	if err != nil {
		log.Printf("Error acceso video SHM: %v", err)
	}
	dataVideo, err = shm.At(shmidVideo, 0, 0)
	if err != nil {
		log.Printf("Error mapeo video: %v", err)
	}

	// Callback to send frames for 1 minute
	var frameCount int
	start := time.Now()

	src.SetCallbacks(&app.SourceCallbacks{
		NeedDataFunc: func(self *app.Source, _ uint) {
			// Calcular puntero al slot actual
			offset := slotActual * 4147224 // tamaño total del slot
			slot := dataVideo[offset : offset+4147224]
			log.Println("Produciendo frame:", frameCount)

			if time.Since(start) > 10*time.Second {
				log.Println("cerrando stream")
				self.EndStream()
				return
			}

			sizeExpected := 1920 * 1080 * 2 * 10
			if len(dataVideo) < sizeExpected {
				log.Printf("Tamaño insuficiente de datos en memoria compartida: %d < %d", len(dataVideo), sizeExpected)
				return
			}
			// Leer datos de video (después de los 24 bytes de header)
			dataVideo_MOD := slot[headerSize:]

			log.Println("Duration", time.Duration(frameCount)*frameDuration)
			buf := gst.NewBufferFromBytes(dataVideo_MOD)
			pts := gst.ClockTime(time.Duration(frameCount) * frameDuration)
			buf.SetPresentationTimestamp(pts)
			buf.SetDuration(gst.ClockTime(frameDuration))

			self.PushBuffer(buf)
			buf.Unref()
			frameCount++
			// Marcar este slot como libre en memoria compartida
			binary.LittleEndian.PutUint32(slot[0:4], 0)

			// Avanzar al siguiente slot circularmente
			slotActual = (slotActual + 1) % numSlots
		},
	})

	return pipeline, nil
}

func mainLoop(loop *glib.MainLoop, pipeline *gst.Pipeline) error {

	pipeline.SetState(gst.StatePlaying)

	go func() {
		time.Sleep(10 * time.Second)
		log.Println("Done")
		pipeline.SendEvent(gst.NewEOSEvent())
	}()

	pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
		switch msg.Type() {
		case gst.MessageEOS:
			log.Println("Recording complete")
			loop.Quit()
		case gst.MessageError:
			err := msg.ParseError()
			log.Printf("Error en pipeline: %v", err)
			loop.Quit()
		}
		return true
	})

	loop.Run()
	pipeline.SetState(gst.StateNull)

	if err := shm.Dt(dataVideo); err != nil {
		log.Println("Error al liberar memoria compartida de video:", err)
	}

	return nil
}

This is only a test, then I will add audio.

I cannot immediately see what could be wrong, but what may help is looking at the pipeline graph via the DebugDump* functions.

Also you are dropping messages other than EOS and Error, the other ones may contain the information you are missing, I suggest logging them in the Bus watch

I’ve struggled with this, too, in my case recording robot video cameras from ROS. I still can’t claim to understand it fully but got things to work using h264parse ! mpegtsmux ! multifilesink. In the context of your application I’d say you need the h264parse. But I’m also mentioning the mpegtsmux here because, from what I recall, I couldn’t get mp4mux ! filesink to work.