How to send EOS to `livekitwebrtcsrc`?

When I try to record a livekit streaming with node-gtk, I can’t close the pipeline normally via send the EOS signal.

async function main() {
  /* Initialize GStreamer */
  Gst.init(null);

  /* Build the pipeline */
  const pipeline = Gst.parseLaunch(`
    livekitwebrtcsrc name=src
      signaller::ws-url=${LIVEKIT_URL}
      signaller::api-key=${LIVEKIT_API_KEY}
      signaller::secret-key=${LIVEKIT_SECRET_KEY}
      signaller::room-name=testroom
      signaller::identity=gst-consumer
      signaller::participant-name=gst-consumer
      signaller::producer-peer-id=gst-producer
      video-codecs=<H264>
    src.
    ! queue
    ! rtph264depay
    ! avdec_h264
    ! videoconvert
    ! x264enc
    ! h264parse
    ! queue
    ! mux.video_0
    src.
    ! queue
    ! rtpopusdepay
    ! opusdec
    ! audioconvert
    ! audioresample
    ! avenc_aac
    ! aacparse
    ! queue
    ! mux.audio_0
    mp4mux name=mux
    ! filesink location=video.mp4
  `) as Gst.Pipeline;

  /* Start playing */
  pipeline.setState(Gst.State.PLAYING);

  /* Wait until error or EOS */
  const bus = pipeline.getBus();
  const watcher = new CustomBusWatcher(bus);

  await new Promise<void>(async (resolve) => {
    // Cannot receive EOS signal
    watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
      if (msg.type === Gst.MessageType.ERROR) {
        const [err, debugInfo] = msg.parseError();
        process.stderr.write(`Error: ${err.message}`);
        process.stderr.write(`Debug information: ${debugInfo}`);
      }
      resolve();
    });

    // Wait 3 seconds and send EOS
    await new Promise((resolve) => setTimeout(resolve, 3000));
    pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
  });

  /* Free resources */
  watcher.dispose();
  bus.unref();
  pipeline.setState(Gst.State.NULL);
  pipeline.unref();
}
class CustomBusWatcher {
  private emitter: EventEmitter;
  private intervalId: NodeJS.Timeout | null = null;

  constructor(private bus: Gst.Bus) {
    this.bus = bus;
    this.emitter = new EventEmitter();
    this.run();
  }

  run() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }
    this.intervalId = setInterval(() => {
      const msg = this.bus.pop();
      if (msg) {
        for (const key of Object.values(Gst.MessageType).reverse()) {
          if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
            continue;
          }
          if ((msg.type & key) === key) {
            setImmediate(() => this.emitter.emit(Gst.MessageType[key], msg));
          }
        }
      }
    }, 4);
  }

  on(event: Gst.MessageType, listener: (message: Gst.Message) => void): this {
    for (const key of Object.values(Gst.MessageType).reverse()) {
      if (typeof key !== 'number' || key === Gst.MessageType.UNKNOWN) {
        continue;
      }
      if ((event & key) === key) {
        this.emitter.on(Gst.MessageType[key], listener);
      }
    }

    return this;
  }

  dispose() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }
    this.emitter.removeAllListeners();
  }
}

I tested it with videotestsrc and it was normal

async function main() {
  /* Initialize GStreamer */
  Gst.init(null);

  /* Build the pipeline */
  const pipeline = Gst.parseLaunch(`
    videotestsrc is-live=true ! videoconvert ! x264enc ! h264parse ! queue ! mux.
    audiotestsrc is-live=true ! audioconvert ! audioresample ! avenc_aac ! aacparse ! queue ! mux.
    mp4mux name=mux ! filesink location=video.mp4
  `) as Gst.Pipeline;

  /* Start playing */
  pipeline.setState(Gst.State.PLAYING);

  /* Wait until error or EOS */
  const bus = pipeline.getBus();
  const watcher = new CustomBusWatcher(bus);

  await new Promise<void>(async (resolve) => {
    // Can receive EOS signal
    watcher.on(Gst.MessageType.ERROR | Gst.MessageType.EOS, (msg) => {
      if (msg.type === Gst.MessageType.ERROR) {
        const [err, debugInfo] = msg.parseError();
        process.stderr.write(`Error: ${err.message}`);
        process.stderr.write(`Debug information: ${debugInfo}`);
      }
      resolve();
    });

    // Wait 3 seconds and send EOS
    await new Promise((resolve) => setTimeout(resolve, 3000));
    pipeline.sendEvent(Gst.Event.newEos()); // Already sent out
  });

  /* Free resources */
  watcher.dispose();
  bus.unref();
  pipeline.setState(Gst.State.NULL);
  pipeline.unref();
}

This has been bothering me for a long time. :sob:

Has anyone encountered a situation related to EOS?

Try using the following command:
pipeline.setProperty("message-forward", true);
Instructs the pipeline to immediately forward the EOS message from the filesink instead of holding it back until an EOS message is also received from the livekitwebrtcsrc.

1 Like

Thanks for your reply, but it still doesn’t seem to work :cry:

Here is log after Sending EOS, but there seems to be no sign of stopping

Waiting 3 seconds
Sending EOS
0:00:10.205868460 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:10.661243081 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.661284413 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.706258120 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:10.706668420 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 6214 in 0:00:08.469578002. Remaining now 94064 or approx 160902428115 ns

0:00:10.706687302 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.706697938 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.713735078 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2291658, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:10.713765783 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2291658, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.207472264 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:11.657800459 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.657832089 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.696527477 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 6850 in 0:00:09.324988937. Remaining now 93428 or approx 155855898493 ns

0:00:11.696548630 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.696559048 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.701887482 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2589051, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:11.703362944 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:11.703503382 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2589051, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.207179219 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:12.661354376 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.661392847 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=40, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.674517450 144952 0x7ee544001e30 INFO          dtlsconnection gstdtlsconnection.c:1071:openssl_poll:<dtlsconnection0> handshake is completed
0:00:12.674547924 144952 0x7ee544001e30 INFO        GST_ELEMENT_PADS gstelement.c:1016:gst_element_get_static_pad: found pad sctpdec0:src_1
0:00:12.695381997 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 7430 in 0:00:10.326983135. Remaining now 92848 or approx 155358213608 ns

0:00:12.695412484 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.695426723 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=32, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.702741424 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=2856440, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:12.708492625 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:12.708739656 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=2856440, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.204558687 144952 0x7ee5440017f0 INFO               baseparse gstbaseparse.c:4111:gst_base_parse_set_latency:<h264parse0> min/max latency 0:00:00.000000000, 0:00:00.000000000
0:00:13.660600981 144952 0x7ee544000e90 INFO              aggregator gstaggregator.c:4045:gst_aggregator_update_segment:<mux> Updating srcpad segment: bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.660633374 144952 0x7ee544000e90 INFO               GST_EVENT gstevent.c:998:gst_event_new_segment: creating segment event bytes segment start=100326, offset=0, stop=-1, rate=1.000000, applied_rate=1.000000, flags=0x00, time=0, base=0, position 0, duration -1
0:00:13.701237346 144952 0x7ee544000e90 INFO                   qtmux gstqtmux.c:4680:gst_qt_mux_robust_recording_rewrite_moov:<mux> Reserved 100278 header bytes. Used 8238 in 0:00:11.386526931. Remaining now 92040 or approx 150077002690 ns