How to send EOS to `livekitwebrtcsrc`?

When I try to record a livekit streaming with node-gtk, I can’t close the pipeline normally via send the EOS signal.

async function main() {
  /* Initialize GStreamer */
  Gst.init(null);

  /* Build the pipeline */
  const pipeline = Gst.parseLaunch(`
    livekitwebrtcsrc name=src
      signaller::ws-url=${LIVEKIT_URL}
      signaller::api-key=${LIVEKIT_API_KEY}
      signaller::secret-key=${LIVEKIT_SECRET_KEY}
      signaller::room-name=testroom
      signaller::identity=gst-consumer
      signaller::participant-name=gst-consumer
      signaller::producer-peer-id=gst-producer
      video-codecs=<H264>
    src.
    ! queue
    ! rtph264depay
    ! avdec_h264
    ! videoconvert
    ! x264enc
    ! h264parse
    ! queue
    ! mux.video_0
    src.
    ! queue
    ! rtpopusdepay
    ! opusdec
    ! audioconvert
    ! audioresample
    ! avenc_aac
    ! aacparse
    ! queue
    ! mux.audio_0
    mp4mux name=mux
    ! filesink location=video.mp4
  `) as Gst.Pipeline;

  /* Start playing */
  pipeline.setState(Gst.State.PLAYING);

  /* Wait until error or EOS */
  const bus = pipeline.getBus();
  const watcher = new CustomBusWatcher(bus);

  await new Promise<void>(async (resolve) => {
    // Cannot receive EOS signal
    watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
      if (msg.type === Gst.MessageType.ERROR) {
        const [err, debugInfo] = msg.parseError();
        process.stderr.write(`Error: ${err.message}`);
        process.stderr.write(`Debug information: ${debugInfo}`);
      }
      resolve();
    });

    // Wait 3 seconds and send EOS
    await new Promise((resolve) => setTimeout(resolve, 3000));
    pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
  });

  /* Free resources */
  watcher.dispose();
  bus.unref();
  pipeline.setState(Gst.State.NULL);
  pipeline.unref();
}
class CustomBusWatcher {
  private emitter: EventEmitter;
  private intervalId: NodeJS.Timeout | null = null;

  constructor(private bus: Gst.Bus) {
    this.bus = bus;
    this.emitter = new EventEmitter();
    this.run();
  }

  run() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }
    this.intervalId = setInterval(() => {
      const msg = this.bus.pop();
      if (msg) {
        for (const key of Object.values(Gst.MessageType).reverse()) {
          if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
            continue;
          }
          if ((msg.type & key) === key) {
            setImmediate(() => this.emitter.emit(Gst.MessageType[key], msg));
          }
        }
      }
    }, 4);
  }

  on(event: Gst.MessageType, listener: (message: Gst.Message) => void): this {
    for (const key of Object.values(Gst.MessageType).reverse()) {
      if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
        continue;
      }
      if ((event & key) === key) {
        this.emitter.on(Gst.MessageType[key], listener);
      }
    }

    return this;
  }

  dispose() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }
    this.emitter.removeAllListeners();
  }
}

I tested it with videotestsrc and it was normal

async function main() {
  /* Initialize GStreamer */
  Gst.init(null);

  /* Build the pipeline */
  const pipeline = Gst.parseLaunch(`
    videotestsrc is-live=true ! videoconvert ! x264enc ! h264parse ! queue ! mux.
    audiotestsrc is-live=true ! audioconvert ! audioresample ! avenc_aac ! aacparse ! queue ! mux.
    mp4mux name=mux ! filesink location=video.mp4
  `) as Gst.Pipeline;

  /* Start playing */
  pipeline.setState(Gst.State.PLAYING);

  /* Wait until error or EOS */
  const bus = pipeline.getBus();
  const watcher = new CustomBusWatcher(bus);

  await new Promise<void>(async (resolve) => {
    // Can receive EOS signal
    watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
      if (msg.type === Gst.MessageType.ERROR) {
        const [err, debugInfo] = msg.parseError();
        process.stderr.write(`Error: ${err.message}`);
        process.stderr.write(`Debug information: ${debugInfo}`);
      }
      resolve();
    });

    // Wait 3 seconds and send EOS
    await new Promise((resolve) => setTimeout(resolve, 3000));
    pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
  });

  /* Free resources */
  watcher.dispose();
  bus.unref();
  pipeline.setState(Gst.State.NULL);
  pipeline.unref();
}

This has been bothering me for a long time. :sob:

Has anyone encountered a situation related to EOS?

Try using the following command:
pipeline.setProperty("message-forward", true);
Instructs the pipeline to immediately forward the EOS message from the filesink instead of holding it back until an EOS message is also received from the livekitwebrtcsrc.

Thanks for your reply, but it still doesn’t seem to work :cry:

Here is log after Sending EOS, but there seems to be no sign of stopping

Waiting 3 seconds
Sending EOS
0:00:10.205868460 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:10.661243081 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.661284413 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.706258120 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:10.706668420 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 6214 in 0:00:08.469578002. Remaining now 94064 or approx 160902428115 ns

0:00:10.706687302 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.706697938 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.713735078 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2291658, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.713765783 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2291658, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.207472264 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:11.657800459 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.657832089 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.696527477 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 6850 in 0:00:09.324988937. Remaining now 93428 or approx 155855898493 ns

0:00:11.696548630 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.696559048 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.701887482 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2589051, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.703362944 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:11.703503382 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2589051, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.207179219 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:12.661354376 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.661392847 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.674517450 144952 0x7ee544001e30 INFO          dtlsconnection gstdtlsconnection.c:1071:openssl_poll:<dtlsconnection0> handshake is completed
0:00:12.674547924 144952 0x7ee544001e30 INFO        GST_ELEMENT_PADS gstelement.c:1016:gst_element_get_static_pad: found pad sctpdec0:src_1
0:00:12.695381997 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 7430 in 0:00:10.326983135. Remaining now 92848 or approx 155358213608 ns

0:00:12.695412484 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.695426723 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.702741424 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2856440, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.708492625 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:12.708739656 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2856440, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.204558687 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:13.660600981 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.660633374 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.701237346 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 8238 in 0:00:11.386526931. Remaining now 92040 or approx 150077002690 ns