How do I control the behaviour of the pad probes

I have a pipeline. I cant upload the code as file, so I putting it here itself.



def face_search_face_registry_pipeline():
    Gst.init(None)
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")


    print("Creating Source \n ")
    source = Gst.ElementFactory.make("appsrc", "source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")
    
    jpegparse = Gst.ElementFactory.make("jpegparse", "jpegparse")
    if not jpegparse:
        sys.stderr.write(" Unable to create jpegparse \n")

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        sys.stderr.write(" Unable to create tracker \n")
    
    queue1 = Gst.ElementFactory.make("queue", "queue1")
    if not queue1:
        sys.stderr.write(" Unable to create queue1 \n")
    
    
    sgie0 = Gst.ElementFactory.make("nvinfer", "landmarks_model")
    if not sgie0:
        sys.stderr.write(" Unable to make sgie0 \n")
    
    sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine")
    if not sgie1:
        sys.stderr.write(" Unable to make sgie1 \n")

    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")


    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")

    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")

    # Finally render the osd output
    if is_aarch64():
        print("Creating nv3dsink \n")
        sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
        if not sink:
            sys.stderr.write(" Unable to create nv3dsink \n")
    else:
        print("Creating EGLSink \n")
        sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
        if not sink:
            sys.stderr.write(" Unable to create egl sink \n")


    streammux.set_property('width', 640)
    streammux.set_property('height', 480)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 400)
    streammux.set_property('live-source', 0)
    sink.set_property("sync",0)
    sink.set_property("qos",0)
    #Set properties of pgie and sgie
    pgie.set_property('config-file-path', "<path>/config_Face_Detection.txt")
    sgie0.set_property('config-file-path',"<path>/config_Facial_Landmarks.txt")
    sgie1.set_property('config-file-path',"<path>/config_Facial_Embeddings.txt")
    tracker.set_property('ll-config-file',"config_tracker_NvDCF_perf.yml")
    tracker.set_property('ll-lib-file', "/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so")
    source.connect('need-data', need_data_cb)
    source.set_property("format", Gst.Format.TIME)
    source.set_property("do-timestamp", True)
    source.set_property("is-live", False)
    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(jpegparse)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(sgie0) 
    pipeline.add(queue1)
    pipeline.add(sgie1)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)
    print("Linking elements in the Pipeline \n")
    source.link(jpegparse)
    jpegparse.link(decoder)
    sinkpad = streammux.get_request_pad("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    #pgie.link(sgie0)
    #sgie0.link(queue1)
    pgie.link(sgie1)
    sgie1.link(nvvidconv)
    nvvidconv.link(nvosd)
    nvosd.link(sink)
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    osdsinkpad = nvosd.get_static_pad("sink")
    if not osdsinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")
    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

    nvvidconv_sinkpad = nvvidconv.get_static_pad("sink")
    if not nvvidconv_sinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")
    nvvidconv_sinkpad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, custom_data)

    print("Starting pipeline \n")
    
    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)
    try:
      print("strarting ")
      loop.run()
    except:
      print("in except")
      pass

    # cleanup
    pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    face_search_face_registry_pipeline()

I’m using appsrc plugin, that feeds the images to the pipeline.

what I’m trying to do :- Just have a three model pipeline, Face Detector, Face Landmarks Estimation and Facial Embeddings prediction.

I get the facial embeddings and use them for other functionality in my project, like registering a face in database or searching for a face.

the problem that I am facing :- say, I send face_N into the pipeline, I get the face embedding of face_(N-1).

Some More info :- I have a probe function pgie_src_pad_buffer_probe this is attached on sinkpad of nvosd element, this probe function takes out or reads. embeddings.

I am wondering what is causing this. Is it because of how the pad probes work?? is it like, the moment face_N ( or say buffer_N) entered the pipeline, the pad probe callback ( assume attached on pad P1) got triggered even if buffer_N was yet to reach the pad P1. And at the time, when buffer_N entered the pipeline, buffer_N-1 was there on pad P1 and because of this I got the embeddings of the buffer_N-1??

or it is something else ?

def pgie_src_pad_buffer_probe(pad, info, u_data):
    event.wait()
    #print("in probe ")
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        #print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    
    

    detection_params = DetectionParam(CLASS_NB, ACCURACY_ALL_CLASS)
    box_size_param = BoxSizeParam(IMAGE_HEIGHT, IMAGE_WIDTH,
                                  MIN_BOX_WIDTH, MIN_BOX_HEIGHT)
    nms_param = NmsParam(TOP_K, IOU_THRESHOLD)

    #label_names = get_label_names_from_file("labels.txt")

    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            #print("got frame meta ")
        except StopIteration:
            break
        #print("number of objects {}".format(frame_meta.num_obj_meta) )
        if (frame_meta.num_obj_meta == 1 ): 
            frame_number=frame_meta.frame_num
            l_obj=frame_meta.obj_meta_list
            num_rects = frame_meta.num_obj_meta
            obj_counter = {
            PGIE_CLASS_ID_VEHICLE:0,
            PGIE_CLASS_ID_PERSON:0,
            PGIE_CLASS_ID_BICYCLE:0,
            PGIE_CLASS_ID_ROADSIGN:0
            }
            #print("#"*50)
            while l_obj:
                try: 
                    # Note that l_obj.data needs a cast to pyds.NvDsObjectMeta
                    # The casting is done by pyds.NvDsObjectMeta.cast()
                    obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
                except StopIteration:
                    break
                obj_counter[obj_meta.class_id] += 1
                l_user_meta = obj_meta.obj_user_meta_list
                # Extract object level meta data from NvDsAnalyticsObjInfo
                object_conf = obj_meta.confidence
                if object_conf < 0.6:
                    #do not register face 
                
                if object_conf >= 0.6:
                    
                    while l_user_meta:
                        try:
                            user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
                            #print("base meta is ",user_meta.base_meta.meta_type )
                            if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
                                #print("found tensor meta") 
                                tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
                                layer = pyds.get_nvds_LayerInfo(tensor_meta, 0)
                                # Convert NvDsInferLayerInfo buffer to numpy array
                                ptr = ctypes.cast(pyds.get_ptr(layer.buffer), ctypes.POINTER(ctypes.c_float))
                                v = np.ctypeslib.as_array(ptr, shape=(512,))
                                #print(v)
                                emb_list = []
                                for x in v: 
                                    emb_list.append(x)
                                #print(len(emb_list))
                                embedding_ = get_embedding(emb_list)
                                if len(u_data[0]) > 0: #shold the len(u_data[1]) also be checked here? 
                                    #this is face registry part 
                                    print("Handelling Face Registry ")
                                    #dump_name_em_to_json(u_data[0][-1][0], v, "./face_registry_embedding_{}.json".format(u_data[0][-1][1]))
                                    # data_to_write = {"name":u_data[-1],"embedding":emb_list}
                                    # write_to_text(data_to_write, "./face_registry_embedding_{}.txt".format(u_data[-1]))
                                    # dump_name_em_to_json(u_data[-1], emb_list, "./face_registry_embedding_{}.json".format(u_data[-1]))
                                    insert_data = {
                                        'name': u_data[0][0]["name"],
                                        'em': embedding_,
                                        'face_registry_id': str(u_data[0][0]["face_registry_id"]),
                                        }
                                    
                                    
                                    insert_into_postgresql_table(
                                        host='localhost',
                                        port='5432',
                                        database='postgres',
                                        user='postgres',
                                        password='password',
                                        table='public.embeddings111',
                                        data=insert_data
                                    )
                                    face_registry_success_payload = {
                                        "replyFlag":"SUCCESS",
                                        "payloadType":"face_regsitry",
                                        "reply_queue":u_data[0][0]["replyQueue"]
                                    }
                                    lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_registry_success_payload)
                                    popped_val = u_data[0].pop(0)
                                    #print("popped from face registry list :- {} and length of face registry list is {} ".format("not_displaying", len(u_data[0]))) 
                                    #restart the code of op consumer 
                                    #print("&"*50)
                                    kill_process_by_name2("multi_faiss_op")
                                    #print("&"*50)
                                    #run_cpp_executable("../../../../src/multi_faiss_op")
                                if len(u_data[1]) > 0:
                                    #this is the face search part 
                                    print("Handelling Face Search ")
                                    face_search_payload = {
                                        "embedding":str(emb_list),
                                        "payloadType":"face_search",
                                        "reply_queue":u_data[1][0]["replyQueue"],
                                        "replyFlag":"C"
                                    }
                                    lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_search_payload)
                                    popped_val = u_data[1].pop(0)
                                    #print("popped from face search list :- {} and length of face search list is {} ".format("not_displaying", len(u_data[1]))) 
                                    #handle_face_search_payload()

                        except StopIteration:
                            break

                        try:
                            l_user_meta = l_user_meta.next
                        except StopIteration:
                            break
                    try: 
                        l_obj=l_obj.next
                    except StopIteration:
                        break
                else:
                    print("Detection Conf is low ")
                    break
        elif frame_meta.num_obj_meta == 0:
            print("No face detected")
            if len(u_data[1]) > 0:
                face_search_failuer_payload_zero_faces = {
                                    "replyFlag":"FNF",
                                    "payloadType":"face_search",
                                    "reply_queue":u_data[1][0]["replyQueue"]
                                }
                lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_search_failuer_payload_zero_faces)
                popped_val = u_data[1].pop(0)

            if len(u_data[0]) > 0:
                face_registry_failuer_payload_zero_faces = {
                                    "replyFlag":"FNF",
                                    "payloadType":"face_regsitry",
                                    "reply_queue":u_data[0][0]["replyQueue"]
                                    
                                }
                lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_registry_failuer_payload_zero_faces)
                popped_val = u_data[0].pop(0)
                


        
        elif frame_meta.num_obj_meta > 1:
            print("more than one face, not handelling.")
            print(len(u_data), len(u_data[1]))
            if len(u_data[1]) > 0 : 
                face_search_failuer_payload_multiple_faces = {
                                    "replyFlag":"MFD",
                                    "payloadType":"face_search",
                                    "reply_queue":u_data[1][0]["replyQueue"]
                                    
                                }
                lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_search_failuer_payload_multiple_faces)
                popped_val = u_data[1].pop(0)
            elif len(u_data[0]) > 0: 
                face_registry_failuer_payload_multiple_faces = {
                                    "replyFlag":"MFD",
                                    "payloadType":"face_regsitry",
                                    "reply_queue":u_data[0][0]["replyQueue"]
                                    
                                }
                lpush_to_redis_queue("localhost", 6379, "face_search_pipeline_out_queue", face_registry_failuer_payload_multiple_faces)
                popped_val = u_data[0].pop(0)

        try:
            # indicate inference is performed on the frame
            frame_meta.bInferDone = True
            l_frame = l_frame.next
        except StopIteration:
            break
    #print("#"*50)
    #print("infer status ",  frame_meta.bInferDone )
    return Gst.PadProbeReturn.OK ```

from more debugging, what I have found out is that appsrc is not pushing all the buffers in the pipeline.