diff --git a/baseHandler.py b/baseHandler.py index 61532e4..94c8ca2 100644 --- a/baseHandler.py +++ b/baseHandler.py @@ -31,7 +31,7 @@ def run(self): input = self.queue_in.get() if isinstance(input, bytes) and input == b"END": # sentinelle signal to avoid queue deadlock - logger.debug("Stopping thread") + logger.debug(f"{self.__class__.__name__}: Received END message, stopping thread") break start_time = perf_counter() for output in self.process(input): @@ -44,6 +44,10 @@ def run(self): self.cleanup() self.queue_out.put(b"END") + def stop(self): + logger.debug(f"{self.__class__.__name__}: Stopping pipeline component..") + self.stop_event.set() + @property def last_time(self): return self._times[-1] diff --git a/connections/socket_receiver.py b/connections/socket_receiver.py index 6fb734c..dc5b985 100644 --- a/connections/socket_receiver.py +++ b/connections/socket_receiver.py @@ -45,7 +45,7 @@ def run(self): self.socket.listen(1) logger.info("Receiver waiting to be connected...") self.conn, _ = self.socket.accept() - logger.info("receiver connected") + logger.info("Receiver connected") self.should_listen.set() while not self.stop_event.is_set(): @@ -58,3 +58,8 @@ def run(self): self.queue_out.put(audio_chunk) self.conn.close() logger.info("Receiver closed") + + def stop(self): + logger.debug("Receiver is in stopping process. Sending END message to the next component to initiate server termination..") + self.queue_out.put(b"END") + self.stop_event.set() diff --git a/connections/socket_sender.py b/connections/socket_sender.py index 11ed210..d0e98b8 100644 --- a/connections/socket_sender.py +++ b/connections/socket_sender.py @@ -16,6 +16,8 @@ def __init__(self, stop_event, queue_in, host="0.0.0.0", port=12346): self.stop_event = stop_event self.queue_in = queue_in self.host = host + self.socket = None + self.conn = None self.port = port def run(self): @@ -24,13 +26,26 @@ def run(self): self.socket.bind((self.host, self.port)) self.socket.listen(1) logger.info("Sender waiting to be connected...") - self.conn, _ = self.socket.accept() - logger.info("sender connected") - - while not self.stop_event.is_set(): - audio_chunk = self.queue_in.get() - self.conn.sendall(audio_chunk) - if isinstance(audio_chunk, bytes) and audio_chunk == b"END": - break - self.conn.close() - logger.info("Sender closed") + + try: + self.conn, _ = self.socket.accept() + logger.info("Sender connected") + + while not self.stop_event.is_set(): + audio_chunk = self.queue_in.get() + self.conn.sendall(audio_chunk) + if isinstance(audio_chunk, bytes) and audio_chunk == b"END": + break + except OSError as e: + # Handle exception due to socket shutdown + logger.debug(f"SocketSender received exception: {e}. Possibly the sever is in termination process..") + + finally: + if self.conn is not None: + self.conn.close() + logger.info("Sender closed") + + def stop(self): + self.stop_event.set() + logger.debug("SocketSender: shutdown socket") + self.socket.shutdown(socket.SHUT_RDWR) # Shutdown the socket to overcome blocking socket calls (e.g. accept()) diff --git a/s2s_pipeline.py b/s2s_pipeline.py index 6ca0b8a..8732df4 100644 --- a/s2s_pipeline.py +++ b/s2s_pipeline.py @@ -475,9 +475,13 @@ def main(): try: pipeline_manager.start() + pipeline_manager.join_all() except KeyboardInterrupt: - pipeline_manager.stop() + print("Stopping server..") + finally: + pipeline_manager.stop() + print("Server closed.") if __name__ == "__main__": main() diff --git a/utils/thread_manager.py b/utils/thread_manager.py index fc1ca4a..23906a0 100644 --- a/utils/thread_manager.py +++ b/utils/thread_manager.py @@ -1,5 +1,7 @@ import threading +import logging +logger = logging.getLogger(__name__) class ThreadManager: """ @@ -15,9 +17,18 @@ def start(self): thread = threading.Thread(target=handler.run) self.threads.append(thread) thread.start() + logger.debug(f'Thread {thread.ident} has started. Target: {thread.name}, type: {type(handler)}') + + def join_all(self): + for thread in self.threads: + logger.debug(f'Thread {thread.ident} attempt to join. Target: {thread.name}') + # Allow the main thread to remain responsive to KeyboardInterrupt while waiting for a child thread to finish + while thread.is_alive(): + thread.join(timeout=0.5) + logger.debug(f'Thread {thread.ident} has joined. Target: {thread.name}') def stop(self): + logger.debug("Server stop was invoked") for handler in self.handlers: - handler.stop_event.set() - for thread in self.threads: - thread.join() + handler.stop() + self.join_all()