How to get the eos message when I use filesrc with python api?

Hi, I am now trying to make my program decode a local h.264 encoded video file with python api. The program used bus to get message, but it seems that the type of message is not Gst.MessageType.EOS when the video end. Maybe I should use the Gst.ProgressType.COMPLETE?
Here is a slice of my code:

message = self.bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
        self.current_message = message
        if self.framedata.img is not None:
           self.image = np.ascontiguousarray(self.framedata.img)
           return True
        if message.type == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(("Error received from element %s: %s \n" % (
                message.src.get_name(), err)))
            print(("Debugging information: %s \n" % debug))
            return False
        elif message.type == Gst.MessageType.EOS:
            print("End-Of-Stream reached.\n")
            self.framedata.data_invalid()
            # self.start_decode()
            return False
        elif message.type == Gst.MessageType.STATE_CHANGED:
            if isinstance(message.src, Gst.Pipeline):
                old_state, new_state, pending_state = message.parse_state_changed()
                print(("Pipeline state changed from %s to %s." %
                       (old_state.value_nick, new_state.value_nick)))
        elif message.type == Gst.MessageType.PROGRESS:
            if message.type == Gst.
            return True
        else:
            print("Gstream message " + Gst.message_type_get_name(message.type)) 
            return True

Could you share more details about your pipeline?

Are you using an appsink in your pipeline?

Hello tpm:
Yes, I am using appsink in my pipeline, here is my pipeline

    def create_file_pipeline(self, name):
        # Create elements
        self.pipe = Gst.Pipeline.new(name)
        src = Gst.ElementFactory.make("filesrc", "src")
        demuxer = Gst.ElementFactory.make("qtdemux", "demux")

        # Create video pipeline
        queuev = Gst.ElementFactory.make("queue", "queue")
        parse265 = Gst.ElementFactory.make("h264parse", "parse")
        decodebin = Gst.ElementFactory.make("nvv4l2decoder", "decode")
        conv = Gst.ElementFactory.make("nvvidconv", "conv")
        caps = Gst.caps_from_string("video/x-raw(memory:NVMM), format=(string)BGRx")
        filter = Gst.ElementFactory.make("capsfilter", "filter")
        filter.set_property("caps", caps)
        conv2 = Gst.ElementFactory.make("nvvidconv", "conv2")
        caps2 = Gst.caps_from_string("video/x-raw")
        fil2 = Gst.ElementFactory.make("capsfilter", "fil2")
        fil2.set_property("caps", caps2)
        # conv3 = Gst.ElementFactory.make("videoconvert", "conv3")
        # caps3 = Gst.caps_from_string("video/x-raw, format=(string)BGR")
        # fil3 = Gst.ElementFactory.make("capsfilter", "fil3")
        # fil3.set_property("caps", caps3)
        sink = Gst.ElementFactory.make("appsink", "sink")
        # config the appsink
        sink.set_property("emit-signals", True)
        sink.connect("new-sample", new_buffer, self.framedata)

        # Create a audio pipeline, despite that it is not used
        decodebina = Gst.ElementFactory.make("faad", "decodea")
        queuea = Gst.ElementFactory.make("queue", "queuea")
        conva = Gst.ElementFactory.make("audioconvert", "conva")
        sinka = Gst.ElementFactory.make("fakesink", "sinka")

        # Set file path
        src.set_property("location", self.file_path)
        demuxer.connect("pad-added", cb_demuxer_newpad, queuev, queuea)

        # add elements to pipeline
        self.pipe.add(src)
        self.pipe.add(demuxer)
        self.pipe.add(queuev)
        self.pipe.add(parse265)
        self.pipe.add(decodebin)
        self.pipe.add(conv)
        self.pipe.add(filter)
        self.pipe.add(conv2)
        self.pipe.add(fil2)
        # self.pipe.add(conv3)
        # self.pipe.add(fil3)
        self.pipe.add(sink)

        self.pipe.add(queuea)
        self.pipe.add(decodebina)
        self.pipe.add(conva)
        self.pipe.add(sinka)

        # link
        src.link(demuxer)
        queuev.link(parse265)
        parse265.link(decodebin)
        decodebin.link(conv)
        conv.link(filter)
        filter.link(conv2)
        conv2.link(fil2)
        # fil2.link(conv3)
        # conv3.link(fil3)
        # fil3.link(sink)
        fil2.link(sink)

        queuea.link(decodebina)
        decodebina.link(conva)
        conva.link(sinka)

and this is my call back function for picking frame from appsink:

def new_buffer(sink, data):
    # pull sample
    sample = sink.emit("pull-sample")
    buf = sample.get_buffer()
    caps = sample.get_caps()
    # create buffer
    arr = np.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         4),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=np.uint8)
    
    # extract framdata into 3 channels
    data.img = arr[:,:,0:3]
    data.data_valid()
    data.frame_shape = data.img.shape
    if data.frame < 10000:
        data.frame = data.frame+1
    else:
        data.frame = 0
    return Gst.FlowReturn.OK

data.frame is a object containing a cv2(also a numpy) reference works as buffer

Still, no solution :smiling_face_with_tear:

OK, I already knew what has happened. If the rtsp camera become offline, it won’t triggered the EOS. Instead, the message from rtspsrc comes up with GstRTSPSrcTimeout