I added a ping monitor and now the reconnection logic works.
I wrote the code in zig, but it is similar to C.
It’s quick and dirty, but I works.
const std = @import("std");
const gst = @import("gst.zig");
const State = enum(u8) { Good, Bad };
var ping_state = std.atomic.Value(State).init(.Good);
var ice_state = std.atomic.Value(State).init(.Good);
pub fn main() !void {
var argc: c_int = 0;
var argv: [*c][*c]u8 = null;
gst.gst_init(&argc, &argv);
const video_device = "/dev/v4l/by-id/usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-video-index0";
const mic_device = "alsa_input.pci-0000_05_00.6.3.analog-stereo";
const hdmi_device = "alsa_input.usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-02.analog-stereo";
const url = "http://kuon:xxx@xxx:8889/live/fly/whip";
_ = try std.process.Child.run(.{ .allocator = std.heap.page_allocator, .argv = &.{
"pactl",
"set-source-volume",
"alsa_input.usb-Video_Grabber_HDMI_to_U3_capture_20000130041415-02.analog-stereo",
"3%",
} });
_ = try std.process.Child.run(.{ .allocator = std.heap.page_allocator, .argv = &.{
"pactl",
"set-source-volume",
"alsa_input.pci-0000_05_00.6.analog-stereo",
"10%",
} });
var ping_thread = try std.Thread.spawn(.{}, ping, .{});
ping_thread.detach();
while (true) {
// Wait for the ping to be good
while (true) {
if (ping_state.load(.unordered) != .Good) {
std.time.sleep(1 * std.time.ns_per_s);
} else {
break;
}
}
// Start pipeline
const pipeline =
gst.gst_parse_launch(std.fmt.comptimePrint(
\\ whipclientsink signaller::whip-endpoint="{s}" name=ws video-caps="video/x-h264"
\\ max-bitrate=5000000 start-bitrate=4000000 min-bitrate=1000000
\\ v4l2src device={s} !
\\ video/x-raw,format=YUY2,width=1920,height=1080,framerate=30/1
\\ ! tee name=v
\\ v. ! queue leaky=1 ! autovideosink sync=false
\\ v. ! queue ! ws.
\\ pulsesrc device={s} ! queue ! audiomixer name=a
\\ pulsesrc device={s} ! queue ! a.
\\ a. ! ws.
, .{ url, video_device, mic_device, hdmi_device }), null);
defer gst.gst_object_unref(pipeline);
_ = gst.gst_element_set_state(pipeline, gst.GST_STATE_PLAYING);
const bus = gst.gst_element_get_bus(pipeline);
defer gst.gst_object_unref(bus);
const webrtcbin = gst.gst_bin_get_by_name(@ptrCast(pipeline), "ws");
defer gst.gst_object_unref(webrtcbin);
_ = gst.g_signal_connect_data(
webrtcbin,
"consumer-added",
@ptrCast(&consumer_added_notify),
null,
null,
0,
);
while (true) {
const msg =
gst.gst_bus_timed_pop_filtered(
bus,
100 * std.time.ns_per_ms,
gst.GST_MESSAGE_ERROR | gst.GST_MESSAGE_EOS,
);
if (msg != null) {
defer gst.gst_message_unref(msg);
if (gst.GST_MESSAGE_TYPE(msg) == gst.GST_MESSAGE_ERROR) {
gst.g_printerr("An error occurred While running the pipeline.\n");
break;
}
}
// If ping is bad or ICE is bad, restart
if (ice_state.load(.unordered) != .Good or
ping_state.load(.unordered) != .Good)
{
gst.g_printerr("bad connection, restarting\n");
break;
}
}
_ = gst.gst_element_set_state(pipeline, gst.GST_STATE_NULL);
}
}
fn ice_notify(
bin: *gst.GstElement,
spec: *gst.GParamSpec,
data: gst.gpointer,
) callconv(.C) void {
_ = spec;
_ = data;
var value = std.mem.zeroes(gst.GValue);
gst.g_object_get_property(@ptrCast(bin), "ice-connection-state", &value);
defer gst.g_value_unset(&value);
if (gst.g_value_get_enum(&value) <= 3) {
ice_state.store(.Good, .unordered);
} else {
ice_state.store(.Bad, .unordered);
}
}
fn consumer_added_notify(
consumer_id: *gst.GstElement,
webrtcbin: gst.gchararray,
arg1: *gst.GstElement,
data: gst.gpointer,
) callconv(.C) void {
_ = consumer_id;
_ = data;
_ = webrtcbin;
_ = gst.g_signal_connect_data(
arg1,
"notify::ice-connection-state",
@ptrCast(&ice_notify),
null,
null,
0,
);
}
fn ping() void {
while (true) {
std.time.sleep(1 * std.time.ns_per_s);
const result = std.process.Child.run(.{
.allocator = std.heap.page_allocator,
.argv = &.{
"ping", "1.1.1.1", "-n", "-c", "1", "-w", "1", "-q",
},
}) catch unreachable;
if (result.term.Exited != 0) {
ping_state.store(.Bad, .unordered);
} else {
ping_state.store(.Good, .unordered);
}
}
}