Output rtmp with audio

I created pipeline, which will get rtsp stream and publish to rtmp. There is no issues in that code. I need rtmp output with audio. I can’t able to get any reference, could you help me to resolve this issue?

CODE:

#!/usr/bin/env python3

import sys
sys.path.append(‘…/’)
import gi
gi.require_version(‘Gst’, ‘1.0’)
gi.require_version(‘GstBase’, ‘1.0’)
from gi.repository import GLib, Gst, GstBase
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from common.is_aarch_64 import is_aarch64
from common.create_element_or_error import create_element_or_error
import pyds

pgie_src_pad_buffer_probe will extract metadata received on tiler sink pad

and update params for drawing rectangle, object information etc.

def tiler_sink_pad_buffer_probe(pad,info,u_data):
global site, camera_conf, upload_video_path, confL, start_time

frame_number=0

gst_buffer = info.get_buffer()
if not gst_buffer:
    print("Unable to get GstBuffer ")
    return
    
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
    try:
        # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
        # The casting is done by pyds.NvDsFrameMeta.cast()
        # The casting also keeps ownership of the underlying memory
        # in the C code, so the Python garbage collector will leave
        # it alone.
        frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
    except StopIteration:
        break

    frame_number=frame_meta.frame_num
    l_obj=frame_meta.obj_meta_list

    # Update frame rate through this probe
    stream_index = "stream{0}".format(frame_meta.pad_index)
    global perf_data
    perf_data.update_fps(stream_index)

    try:
        l_frame=l_frame.next
    except StopIteration:
        break

return Gst.PadProbeReturn.OK

def cb_newpad(decodebin, decoder_src_pad, data):
# print(“In cb_newpad\n”)
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)

# Need to check if the pad created by the decodebin is for video and not
# audio.
if (gstname.find("video") != -1):
    # Link the decodebin pad only if decodebin has picked nvidia
    # decoder plugin nvdec_*. We do this by checking if the pad caps contain
    # NVMM memory features.
    if features.contains("memory:NVMM"):
        # Get the source bin ghost pad
        bin_ghost_pad = source_bin.get_static_pad("src")
        if not bin_ghost_pad.set_target(decoder_src_pad):
            sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
    else:
        sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy, Object, name, user_data):
# print(“Decodebin child added:”, name, “\n”)
if name.find(“decodebin”) != -1:
Object.connect(“child-added”, decodebin_child_added, user_data)

if (is_aarch64() and name.find("nvv4l2decoder") != -1):
    # Object.set_property("drop-frame-interval", 2)
    Object.set_property("enable-max-performance", True)

if "source" in name:
    source_element = child_proxy.get_by_name("source")
    if source_element.find_property('drop-on-latency') != None:
        Object.set_property("drop-on-latency", True)

def create_source_bin(index, uri):
# print(“Creating source bin”)

# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
# print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
    sys.stderr.write(" Unable to create source bin \n")

# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
# uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin-%02d" % index)
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin-%02d" % index)
uri_decode_bin.set_property("rtsp-reconnect-interval", 60)
uri_decode_bin.set_property("smart-record", 0)
# uri_decode_bin.set_property("smart-rec-dir-path", '.')
# uri_decode_bin.set_property("smart-rec-default-duration", 600)
# uri_decode_bin.set_property("smart-rec-cache", 60)
# uri_decode_bin.set_property("drop-frame-interval", 5)
if not uri_decode_bin:
    sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
    sys.stderr.write(" Failed to add ghost pad in source bin \n")
    return None
return nbin

def main():

cameras_list = [
    'rtsp://username:password@ipaddress:554/Streaming/Channels/101?transportmode=unicast&profile=Profile_1',
    'rtsp://username:password@ipaddress:554/Streaming/Channels/101?transportmode=unicast&profile=Profile_1',
    'rtsp://username:password@ipaddress:554/Streaming/Channels/101?transportmode=unicast&profile=Profile_1',
    'rtsp://username:password@ipaddress:554/profile1',
]

global perf_data
perf_data = PERF_DATA(len(cameras_list))

# Standard GStreamer initialization
Gst.init(None)

# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
pipeline = Gst.Pipeline()

# Muxer
streammux = create_element_or_error("nvstreammux", "muxer")
streammux.set_property('live-source', True)
# streammux.set_property('sync-inputs', True)
streammux.set_property('width', 1280)
streammux.set_property('height', 720)
streammux.set_property('batch-size', len(cameras_list))
streammux.set_property('batched-push-timeout', 33000)
pipeline.add(streammux)

# Videoconvert
nvvidconv1 = create_element_or_error("nvvideoconvert", "to-convertor1")
pipeline.add(nvvidconv1)
streammux.link(nvvidconv1)

# Demuxer
demux = create_element_or_error("nvstreamdemux", "demuxer")
pipeline.add(demux)
nvvidconv1.link(demux)

# Sources
for i in range(len(cameras_list)):
    # print("Creating source_bin ", i, " \n ")
    uri_name = cameras_list[i]
    if uri_name.find("rtsp://") == 0:
        is_live = True
    source_bin = create_source_bin(i, uri_name)
    if not source_bin:
        sys.stderr.write("Unable to create source bin \n")
    pipeline.add(source_bin)
    padname = "sink_%u" % i
    sinkpad = streammux.get_request_pad(padname)
    if not sinkpad:
        sys.stderr.write("Unable to create sink pad bin \n")
    srcpad = source_bin.get_static_pad("src")
    if not srcpad:
        sys.stderr.write("Unable to create src pad bin \n")
    srcpad.link(sinkpad)

# Outputs
for i in range(len(cameras_list)):

    queue = create_element_or_error("queue", "queue-" + str(i))
    pipeline.add(queue)

    _srcpad = demux.get_request_pad("src_" + str(i))
    if not _srcpad:
        print("Unable to create output src pad")
        exit(0)

    _sinkpad = queue.get_static_pad('sink')
    if not _sinkpad:
        print("Unable to create output sink pad")
        exit(0)

    _srcpad.link(_sinkpad)

    # Converter
    converter = create_element_or_error("nvvideoconvert", "converter-" + str(i))
    pipeline.add(converter)
    queue.link(converter) 

    # Encoder
    encoder = create_element_or_error("nvv4l2h264enc", "h264-encoder-" + str(i))
    encoder.set_property('bitrate', 1800000)
    encoder.set_property('maxperf-enable', True)
    pipeline.add(encoder)
    converter.link(encoder)
    if is_aarch64():
        encoder.set_property('preset-level', 1)
        encoder.set_property('insert-sps-pps', 1)
        encoder.set_property('bufapi-version', 1)

    # Parse
    h264parse = create_element_or_error("h264parse", "h264-parse-" + str(i))
    pipeline.add(h264parse)
    encoder.link(h264parse)

    # Live Sink
    muxer = create_element_or_error("flvmux", "livemuxer" + str(i))
    pipeline.add(muxer)
    h264parse.link(muxer)
    
    exp_queue = Gst.ElementFactory.make("queue", "expqueue-" + str(i))
    exp_queue.set_property('leaky', 1)
    pipeline.add(exp_queue)
    muxer.link(exp_queue)

    live_sink = create_element_or_error("rtmpsink", "rtmpsink-" + str(i))
    live_sink.set_property("sync", 0)
    live_sink.set_property('location', f'rtmp://ip_address:1935/LiveStream/TV001_stream{i}')
    pipeline.add(live_sink)
    exp_queue.link(live_sink)
    
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)

demux_sink_pad = demux.get_static_pad("sink")
if not demux_sink_pad:
    sys.stderr.write(" Unable to get src pad \n")
else:
    demux_sink_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_sink_pad_buffer_probe, 0)
    # perf callback function to print fps every 5 sec
    GLib.timeout_add(5000, perf_data.perf_print_callback)

# List the sources
pipeline.set_state(Gst.State.PLAYING)

try:
    loop.run()
except:
    pass
    
# cleanup
pipeline.set_state(Gst.State.NULL)

print("Exiting app")

if name == ‘main’:

sys.exit(main())