When I try to record a livekit streaming with node-gtk, I can’t close the pipeline normally via send the EOS signal.
async function main() {
/* Initialize GStreamer */
Gst.init(null);
/* Build the pipeline */
const pipeline = Gst.parseLaunch(`
livekitwebrtcsrc name=src
signaller::ws-url=${LIVEKIT_URL}
signaller::api-key=${LIVEKIT_API_KEY}
signaller::secret-key=${LIVEKIT_SECRET_KEY}
signaller::room-name=testroom
signaller::identity=gst-consumer
signaller::participant-name=gst-consumer
signaller::producer-peer-id=gst-producer
video-codecs=<H264>
src.
! queue
! rtph264depay
! avdec_h264
! videoconvert
! x264enc
! h264parse
! queue
! mux.video_0
src.
! queue
! rtpopusdepay
! opusdec
! audioconvert
! audioresample
! avenc_aac
! aacparse
! queue
! mux.audio_0
mp4mux name=mux
! filesink location=video.mp4
`) as Gst.Pipeline;
/* Start playing */
pipeline.setState(Gst.State.PLAYING);
/* Wait until error or EOS */
const bus = pipeline.getBus();
const watcher = new CustomBusWatcher(bus);
await new Promise<void>(async (resolve) => {
// Cannot receive EOS signal
watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
if (msg.type === Gst.MessageType.ERROR) {
const [err, debugInfo] = msg.parseError();
process.stderr.write(`Error: ${err.message}`);
process.stderr.write(`Debug information: ${debugInfo}`);
}
resolve();
});
// Wait 3 seconds and send EOS
await new Promise((resolve) => setTimeout(resolve, 3000));
pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
});
/* Free resources */
watcher.dispose();
bus.unref();
pipeline.setState(Gst.State.NULL);
pipeline.unref();
}
class CustomBusWatcher {
private emitter: EventEmitter;
private intervalId: NodeJS.Timeout | null = null;
constructor(private bus: Gst.Bus) {
this.bus = bus;
this.emitter = new EventEmitter();
this.run();
}
run() {
if (this.intervalId) {
clearInterval(this.intervalId);
}
this.intervalId = setInterval(() => {
const msg = this.bus.pop();
if (msg) {
for (const key of Object.values(Gst.MessageType).reverse()) {
if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
continue;
}
if ((msg.type & key) === key) {
setImmediate(() => this.emitter.emit(Gst.MessageType[key], msg));
}
}
}
}, 4);
}
on(event: Gst.MessageType, listener: (message: Gst.Message) => void): this {
for (const key of Object.values(Gst.MessageType).reverse()) {
if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
continue;
}
if ((event & key) === key) {
this.emitter.on(Gst.MessageType[key], listener);
}
}
return this;
}
dispose() {
if (this.intervalId) {
clearInterval(this.intervalId);
}
this.emitter.removeAllListeners();
}
}
I tested it with videotestsrc and it was normal
async function main() {
/* Initialize GStreamer */
Gst.init(null);
/* Build the pipeline */
const pipeline = Gst.parseLaunch(`
videotestsrc is-live=true ! videoconvert ! x264enc ! h264parse ! queue ! mux.
audiotestsrc is-live=true ! audioconvert ! audioresample ! avenc_aac ! aacparse ! queue ! mux.
mp4mux name=mux ! filesink location=video.mp4
`) as Gst.Pipeline;
/* Start playing */
pipeline.setState(Gst.State.PLAYING);
/* Wait until error or EOS */
const bus = pipeline.getBus();
const watcher = new CustomBusWatcher(bus);
await new Promise<void>(async (resolve) => {
// Can receive EOS signal
watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
if (msg.type === Gst.MessageType.ERROR) {
const [err, debugInfo] = msg.parseError();
process.stderr.write(`Error: ${err.message}`);
process.stderr.write(`Debug information: ${debugInfo}`);
}
resolve();
});
// Wait 3 seconds and send EOS
await new Promise((resolve) => setTimeout(resolve, 3000));
pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
});
/* Free resources */
watcher.dispose();
bus.unref();
pipeline.setState(Gst.State.NULL);
pipeline.unref();
}
This has been bothering me for a long time.
Has anyone encountered a situation related to EOS?