How to probe for the latency of every element of a pipeline?

I am using GStreamer 1.20.3, on Jetpack 6 using AGX Orin.
I am trying to benchmark the latency in ms of every element within a pipeline. I have been using probes and the gst_latency_meta_api etc.

#include "GstLatencyMeta.hpp"
/* ───────────── Callbacks used by GstMetaInfo ───────────── */

static gboolean /* correct signature for GstMetaInitFunction     */
meta_init(GstMeta *meta, gpointer /*params*/, GstBuffer * /*buf*/) {
  reinterpret_cast<GstLatencyMeta *>(meta)->enter_ts = GST_CLOCK_TIME_NONE;
  return TRUE;
}

static void /* correct signature for GstMetaFreeFunction     */
meta_free(GstMeta * /*meta*/, GstBuffer * /*buf*/) {
  /* nothing to do – timestamp is a POD value */
}

static gboolean /* GstMetaTransformFunction                      */
meta_transform(GstBuffer *dest, GstMeta *src_meta, GstBuffer * /*src*/,
               GQuark /*type*/, gpointer /*data*/) {
  const auto *src = reinterpret_cast<const GstLatencyMeta *>(src_meta);
  auto *dest_meta = reinterpret_cast<GstLatencyMeta *>(
      gst_buffer_add_meta(dest, gst_latency_meta_get_info(), nullptr));
  dest_meta->enter_ts = src->enter_ts; /* copy timestamp forward   */
  return TRUE;
}

/* ───────────── API‑type singleton (C++11 thread‑safe) ──────────────── */
GType gst_latency_meta_api_get_type(void) {
  /* 1.  Define a static NULL‑terminated tag array */
  static const gchar *tags[] = {"latency", nullptr};
  /* 2.  Register once, get a non‑zero GType back                    */
  static const GType api_type =
      gst_meta_api_type_register("GstLatencyMetaAPI", tags);
  return api_type;
}

/* ───────────── Implementation‑info singleton ───────────────────────── */

const GstMetaInfo *gst_latency_meta_get_info(void) {
  static const GstMetaInfo *info =
      gst_meta_register(GST_LATENCY_META_API_TYPE, /* api type  */
                        "GstLatencyMeta",          /* impl name */
                        sizeof(GstLatencyMeta),    /* struct sz */
                        meta_init, meta_free, meta_transform);
  return info;
}

I have been having problem getting the ms of encoders and parsers which creates brand new buffers. Even though I properly used meta_transform callback to make sure to copy meta within new or transformed buffers.

My main goal would be to programmatically turn on and off pipeline latency benchmark using an API.
But didn’t manage to make it work.

1 Like

There is no GstLatencyMeta in GStreamer, you might want to contact the author of that instead.

What is available upstream is the latency tracer, thought doing live analyses of that would require overriding the default log function and parsing the logs.

1 Like

So it would require to modify core gstreamer files and use the tracer API is that correct ?

Here’s the code for GstLatencyMeta :

#pragma once
#include <gst/gst.h>

G_BEGIN_DECLS

/** Lightweight meta that carries the time a buffer entered an element. */
struct GstLatencyMeta {
  GstMeta meta;
  GstClockTime enter_ts; /* monotonic nanoseconds */
};

GType gst_latency_meta_api_get_type(void) G_GNUC_CONST;
const GstMetaInfo *gst_latency_meta_get_info(void) G_GNUC_CONST;

/* Convenience */
#define GST_LATENCY_META_API_TYPE (gst_latency_meta_api_get_type())
#define gst_buffer_get_latency_meta(b)                                         \
  (GstLatencyMeta *)gst_buffer_get_meta((b), GST_LATENCY_META_API_TYPE)

G_END_DECLS

The benchmark :

#pragma once
#include <atomic>
#include <gst/gst.h>
#include <map>
#include <memory>
#include <string>
#include <vector>

#include "GstLatencyMeta.hpp"

struct ElementStats {
  std::atomic<uint64_t> count{0};
  std::atomic<uint64_t> sum_ns{0};
  std::atomic<uint64_t> min_ns{G_MAXUINT64};
  std::atomic<uint64_t> max_ns{0};

  /* ---------- make the type copy‑constructible ---------- */
  ElementStats() = default;
  ElementStats(const ElementStats &o) noexcept
      : count(o.count.load(std::memory_order_relaxed)),
        sum_ns(o.sum_ns.load(std::memory_order_relaxed)),
        min_ns(o.min_ns.load(std::memory_order_relaxed)),
        max_ns(o.max_ns.load(std::memory_order_relaxed)) {}
  ElementStats &operator=(const ElementStats &o) noexcept {
    count.store(o.count.load(std::memory_order_relaxed));
    sum_ns.store(o.sum_ns.load(std::memory_order_relaxed));
    min_ns.store(o.min_ns.load(std::memory_order_relaxed));
    max_ns.store(o.max_ns.load(std::memory_order_relaxed));
    return *this;
  }
  /* ---------------------------------------------- */

  double avg_ms() const noexcept {
    const auto n = count.load(std::memory_order_relaxed);
    const auto ss = sum_ns.load(std::memory_order_relaxed);
    return n ? static_cast<double>(ss) / n / 1e6 : 0.0;
  }

  double min_ms() const noexcept {
    const auto v = min_ns.load(std::memory_order_relaxed);
    return (v == G_MAXUINT64) ? 0.0 : static_cast<double>(v) / 1e6;
  }

  double max_ms() const noexcept {
    const auto v = max_ns.load(std::memory_order_relaxed);
    return static_cast<double>(v) / 1e6; // 0 → 0.0 automatically
  }
};

class GstLatencyBench final {
public:
  explicit GstLatencyBench(GstElement *pipeline);
  ~GstLatencyBench();

  static void process_one_buffer(GstBuffer *buf, ElementStats &stats);

  /** Install probes on *all* sink+src static pads.  idempotent. */
  void enable();
  /** Remove all probes again.  Safe to call even if not enabled. */
  void disable();

  /** Snapshot of statistics for external presentation. */
  std::map<std::string, ElementStats> snapshot() const;
  /** Convenience debugging helper */
  void dump_to_stdout() const;

  GstLatencyBench(const GstLatencyBench &) = delete;
  GstLatencyBench &operator=(const GstLatencyBench &) = delete;
  GstLatencyBench(GstLatencyBench &&) = delete;
  GstLatencyBench &operator=(GstLatencyBench &&) = delete;

private:
  struct ProbePair {
    GstPad *sink_pad{};
    GstPad *src_pad{};
    gulong id_sink{0}, id_src{0};
    std::shared_ptr<ElementStats> stats;
  };
  GstElement *pipeline_;
  std::vector<ProbePair> probes_;

  /* --- static probe callbacks --- */
  static GstPadProbeReturn sink_probe(GstPad *, GstPadProbeInfo *, gpointer);
  static GstPadProbeReturn src_probe(GstPad *, GstPadProbeInfo *, gpointer);
};

I’m also trying to benchmark the latency of gstreamer elements this way:

struct ElemProfiling {
  std::string name;
  std::unordered_map<guint64, std::chrono::steady_clock::time_point> start;
  std::mutex m;

  double min = std::numeric_limits<double>::max();
  double max = 0.0;
  double acc = 0.0;
  size_t n = 0;
};

ElemProfiling* add_elem_probes(GstElement *element) {
  GstPad *in  = gst_element_get_static_pad(element,"sink");
  GstPad *out = gst_element_get_static_pad(element,"src");
  if (!in || !out) { if(in) gst_object_unref(in); if(out) gst_object_unref(out); return nullptr; }

  auto *prof = new ElemProfiling{ gst_element_get_name(element) };

  gst_pad_add_probe(in, GST_PAD_PROBE_TYPE_BUFFER,
    [](GstPad*, GstPadProbeInfo *info, gpointer data){
      auto *p = static_cast<ElemProfiling*>(data);
      GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);

      guint64 key = GST_BUFFER_DTS_OR_PTS(buf);
      if (key == GST_CLOCK_TIME_NONE)
        key = GST_BUFFER_OFFSET(buf);

      const auto now = std::chrono::steady_clock::now();
      {
        std::lock_guard<std::mutex> lk(p->m);
        auto [it, inserted] = p->start.emplace(key, now);
        if (!inserted)
          return GST_PAD_PROBE_OK;
      }

      return GST_PAD_PROBE_OK;
    }, prof, nullptr);

  gst_pad_add_probe(out, GST_PAD_PROBE_TYPE_BUFFER,
    [](GstPad*, GstPadProbeInfo *info, gpointer data){
			auto *p = static_cast<ElemProfiling*>(data);
      GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);

      guint64 key = GST_BUFFER_DTS_OR_PTS(buf);
      if (key == GST_CLOCK_TIME_NONE)
        key = GST_BUFFER_OFFSET(buf);

      double ms = 0.0;
      {
        std::lock_guard<std::mutex> lk(p->m);

        auto it = p->start.find(key);
        if (it == p->start.end())
          return GST_PAD_PROBE_OK;

        ms = std::chrono::duration<double,std::milli>(
                  std::chrono::steady_clock::now() - it->second).count();
        p->start.erase(it);
      }
      p->min = std::min(p->min, ms);
      p->max = std::max(p->max, ms);
      p->acc += ms;
      p->n++;

      return GST_PAD_PROBE_OK;
    }, prof, nullptr);

  gst_object_unref(in);
  gst_object_unref(out);
  return prof;
}

However, I’m encountering issues with elements which that create brand new buffers.
I don’t know if there is a way to bench latency of an element this way.

Using the latency tracer is likely an easier solution. It places a serialized event between buffers and determine the latency base on when the event is received. It does have some corner cases still, but it remains a good tool.

And no, you don’t need to modify the core to override the log function. I believe you can register a log function with gst_debug_add_log_function().