Auto reconnect in whipeclientsink

Ok. Is there other potential signal or property I can watch to anticipate the bad connection? If not, I guess that now that I have written a full app using the API instead of command line, I can add my own connectivity check code in another thread.

This yields another question, I use gst_bus_timed_pop_filtered to block my main thread as per the tutorial.

Should I just send a GST_MESSAGE_APPLICATION message to the bus from another thread to end the pipeline using gst_bus_post and catch that in my main thread?

I added a ping monitor and now the reconnection logic works.

I wrote the code in zig, but it is similar to C.

It’s quick and dirty, but I works.

const std = @import("std");
const gst = @import("gst.zig");

const State = enum(u8) { Good, Bad };

var ping_state = std.atomic.Value(State).init(.Good);
var ice_state = std.atomic.Value(State).init(.Good);

pub fn main() !void {
    var argc: c_int = 0;
    var argv: [*c][*c]u8 = null;
    gst.gst_init(&argc, &argv);

    const video_device = "/dev/v4l/by-id/usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-video-index0";
    const mic_device = "alsa_input.pci-0000_05_00.6.3.analog-stereo";
    const hdmi_device = "alsa_input.usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-02.analog-stereo";
    const url = "http://kuon:xxx@xxx:8889/live/fly/whip";

    _ = try std.process.Child.run(.{ .allocator = std.heap.page_allocator, .argv = &.{
        "pactl",
        "set-source-volume",
        "alsa_input.usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-02.analog-stereo",
        "3%",
    } });

    _ = try std.process.Child.run(.{ .allocator = std.heap.page_allocator, .argv = &.{
        "pactl",
        "set-source-volume",
        "alsa_input.pci-0000_05_00.6.analog-stereo",
        "10%",
    } });

    var ping_thread = try std.Thread.spawn(.{}, ping, .{});

    ping_thread.detach();

    while (true) {
        // Wait for the ping to be good
        while (true) {
            if (ping_state.load(.unordered) != .Good) {
                std.time.sleep(1 * std.time.ns_per_s);
            } else {
                break;
            }
        }

        // Start pipeline
        const pipeline =
            gst.gst_parse_launch(std.fmt.comptimePrint(
            \\  whipclientsink signaller::whip-endpoint="{s}" name=ws video-caps="video/x-h264"
            \\  max-bitrate=5000000 start-bitrate=4000000 min-bitrate=1000000
            \\  v4l2src device={s} !
            \\  video/x-raw,format=YUY2,width=1920,height=1080,framerate=30/1
            \\  ! tee name=v
            \\  v. ! queue leaky=1 ! autovideosink sync=false
            \\  v. ! queue ! ws.
            \\  pulsesrc device={s} ! queue ! audiomixer name=a
            \\  pulsesrc device={s} ! queue ! a.
            \\  a. ! ws.
        , .{ url, video_device, mic_device, hdmi_device }), null);
        defer gst.gst_object_unref(pipeline);

        _ = gst.gst_element_set_state(pipeline, gst.GST_STATE_PLAYING);

        const bus = gst.gst_element_get_bus(pipeline);
        defer gst.gst_object_unref(bus);

        const webrtcbin = gst.gst_bin_get_by_name(@ptrCast(pipeline), "ws");
        defer gst.gst_object_unref(webrtcbin);
        _ = gst.g_signal_connect_data(
            webrtcbin,
            "consumer-added",
            @ptrCast(&consumer_added_notify),
            null,
            null,
            0,
        );

        while (true) {
            const msg =
                gst.gst_bus_timed_pop_filtered(
                bus,
                100 * std.time.ns_per_ms,
                gst.GST_MESSAGE_ERROR | gst.GST_MESSAGE_EOS,
            );
            if (msg != null) {
                defer gst.gst_message_unref(msg);

                if (gst.GST_MESSAGE_TYPE(msg) == gst.GST_MESSAGE_ERROR) {
                    gst.g_printerr("An error occurred While running the pipeline.\n");
                    break;
                }
            }
            // If ping is bad or ICE is bad, restart
            if (ice_state.load(.unordered) != .Good or
                ping_state.load(.unordered) != .Good)
            {
                gst.g_printerr("bad connection, restarting\n");
                break;
            }
        }
        _ = gst.gst_element_set_state(pipeline, gst.GST_STATE_NULL);
    }
}

fn ice_notify(
    bin: *gst.GstElement,
    spec: *gst.GParamSpec,
    data: gst.gpointer,
) callconv(.C) void {
    _ = spec;
    _ = data;

    var value = std.mem.zeroes(gst.GValue);
    gst.g_object_get_property(@ptrCast(bin), "ice-connection-state", &value);
    defer gst.g_value_unset(&value);
    if (gst.g_value_get_enum(&value) <= 3) {
        ice_state.store(.Good, .unordered);
    } else {
        ice_state.store(.Bad, .unordered);
    }
}

fn consumer_added_notify(
    consumer_id: *gst.GstElement,
    webrtcbin: gst.gchararray,
    arg1: *gst.GstElement,
    data: gst.gpointer,
) callconv(.C) void {
    _ = consumer_id;
    _ = data;
    _ = webrtcbin;
    _ = gst.g_signal_connect_data(
        arg1,
        "notify::ice-connection-state",
        @ptrCast(&ice_notify),
        null,
        null,
        0,
    );
}

fn ping() void {
    while (true) {
        std.time.sleep(1 * std.time.ns_per_s);

        const result = std.process.Child.run(.{
            .allocator = std.heap.page_allocator,
            .argv = &.{
                "ping", "1.1.1.1", "-n", "-c", "1", "-w", "1", "-q",
            },
        }) catch unreachable;

        if (result.term.Exited != 0) {
            ping_state.store(.Bad, .unordered);
        } else {
            ping_state.store(.Good, .unordered);
        }
    }
}

I have another issue. It is not directly related to the auto reconnect, so I created another thread. Encoder settings in whipeclientsink

For the record, the audio problem was caused by the OBS client. I moved to SRT protocol in OBS instead of RTSP and it fixed the issue

There is definitely a critical need for automatic reconnection in our field deployments. For now, here’s my first shot at a workaround:

static void webrtc_watchdog(GstElement *signaller, GParamSpec *pspec, GstElement *webrtcbin, GstElement *whipclientsink) {
    std::thread([webrtcbin, whipclientsink]() {
        bool restart = false;

        while (restart == false) {
            std::this_thread::sleep_for(std::chrono::seconds(10));

            if (G_IS_OBJECT(webrtcbin) == false) {
                restart = true;
            } else {
                GstWebRTCICEConnectionState state;
                g_object_get(webrtcbin, "ice-connection-state", &state, NULL);
                restart = (state != GST_WEBRTC_ICE_CONNECTION_STATE_COMPLETED);
            }
        }

        // restart whipclientsink here
    }).detach();
}
g_signal_connect(signaller, "webrtcbin-ready", G_CALLBACK(webrtc_watchdog), whipclientsink);
1 Like