Building a pipeline with tsdemux

I am using rust to develop an application that streams mpegts data over a network stream. I use an AppSrc to receive the data (it is multiplexed with other stuff over a network stream so using udpsrc isn’t an option here).

In rust, this is how I am attempting to build my pipeline. Currently it seems that the connect_pad_added call does nothing.

        let source = gstreamer_app::AppSrc::builder()
            .name("emulator_av_mpeg")
            .build();

        source.set_block(false);
        source.set_do_timestamp(true);
        source.set_is_live(true);

        let queue = gstreamer::ElementFactory::make("queue")
            .name("queue")
            .build()
            .expect("Could not create element.");

        let sconv = gstreamer::ElementFactory::make("tsparse")
            .name("tsparse")
            .build()
            .expect("Could not create element.");

        let demux = gstreamer::ElementFactory::make("tsdemux")
            .name("tsdemux")
            .build()
            .expect("Could not create element.");

        let vdecoder = gstreamer::ElementFactory::make("openh264dec")
            .name("vdecode")
            .build()
            .expect("Could not create source element.");

        let adecoder = gstreamer::ElementFactory::make("avdec_ac3")
            .name("adecode")
            .build()
            .expect("Could not create source element.");

        let asink = gstreamer_app::AppSink::builder().name("audio_sink").build();
        let vsink = gstreamer_app::AppSink::builder().name("video_sink").build();

        let pipeline = gstreamer::Pipeline::with_name("receiving-pipeline");
        pipeline
            .add_many([
                &sconv,
                &queue,
                source.upcast_ref(),
                &demux,
                asink.upcast_ref(),
                vsink.upcast_ref(),
                &vdecoder,
                &adecoder,
            ])
            .unwrap();
        gstreamer::Element::link_many([source.upcast_ref(), &queue, &sconv, &demux]).unwrap();
        adecoder.link(&asink).expect("Failed to link to asink");
        vdecoder.link(&vsink).expect("Failed to link to vsink");

        let video_sink_pad = vdecoder
            .static_pad("sink")
            .expect("could not get sink pad from vdecoder");
        let audio_sink_pad = adecoder
            .static_pad("sink")
            .expect("could not get sink pad from adecoder");
        demux.connect_pad_added(move |_src, src_pad| {
            println!("connect pad added");
            let is_video = if src_pad.name().starts_with("video") {
                true
            } else {
                false
            };

            let is_audio = if src_pad.name().starts_with("audio") {
                true
            } else {
                false
            };

            let connect_demux = || -> Result<(), u8> {
                src_pad
                    .link(&video_sink_pad)
                    .expect("failed to link tsdemux.video");
                println!("linked tsdemux to video decoder");
                Ok(())
            };

            let connect_demux2 = || -> Result<(), u8> {
                src_pad
                    .link(&audio_sink_pad)
                    .expect("failed to link audio to audio decoder");
                println!("linked tsdemux to audio decoder");
                Ok(())
            };

            if is_video {
                match connect_demux() {
                    Ok(_) => println!("video connected"),
                    Err(e) => println!("could not connect video e:{}", e),
                }
            }
            if is_audio {
                match connect_demux2() {
                    Ok(_) => println!("audio connected"),
                    Err(e) => println!("could not connect audio e:{}", e),
                }
            }
        });

        pipeline
            .set_state(gstreamer::State::Playing)
            .expect("Unable to set the pipeline to the `Playing` state");

I don’t think you need the tsparse before the demuxer.

Have you considered using appsrc ! decodebin3 here? It will automatically plug all the required parsers and decoders (and a multiqueue) and will also pop up pads.

Your code looks like it’s missing parsers (h264parse and ac3parse) and also queues or a multiqueue after the demuxer.

Pads will only be added once the demuxer receives enough data (PMT etc.), so only once you actually push data into the appsrc .

It turns out I wasn’t pushing data into the appsrc. I started pushing data into the app src and the pipeline building looks good. Now onto to figuring out why I’m not getting samples out of my appsink for video.