From be8ef9cf3dff2fe7c664fa21e64284e0ec8c0dbb Mon Sep 17 00:00:00 2001 From: Gabo Date: Fri, 25 Oct 2024 12:25:08 +0200 Subject: [PATCH] Add restart condition --- src/queue_processor/QueueProcessor.py | 30 ++++++++++++++++++--------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/queue_processor/QueueProcessor.py b/src/queue_processor/QueueProcessor.py index 2183cd4..20b1323 100644 --- a/src/queue_processor/QueueProcessor.py +++ b/src/queue_processor/QueueProcessor.py @@ -1,4 +1,5 @@ import logging +from collections.abc import Callable from time import sleep import redis @@ -45,23 +46,31 @@ def create_queues(self): self.queue_processor_logger.info(f"Creating queue {queue_name}") self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute() - def start(self, process: callable, run_once: bool = False): + def start(self, process: callable, restart_condition: Callable = None): self.queue_processor_logger.info("QueueProcessor running") while True: - executed_once = False + restart = False for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names): try: self.create_queues() task_queue = self.get_queue(task_queue_name) message = task_queue.receiveMessage().execute() task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() - results = process(utils.decode_message(message["message"])) - executed_once = True - if results: - self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message( - results - ).execute() - break + message = utils.decode_message(message["message"]) + results = process(message) + + if not results: + continue + + try: + restart = True if restart else restart_condition(message) + except: + restart = False + + self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message( + results + ).execute() + break except NoMessageInQueue: sleep(2) @@ -74,5 +83,6 @@ def start(self, process: callable, run_once: bool = False): self.queue_processor_logger.error(f"Error: {e}", exc_info=True) sleep(60) - if run_once and executed_once: + if restart: + sleep(self.delay_time_for_results + 5) break \ No newline at end of file