diff --git a/src/queue_processor/QueueProcessor.py b/src/queue_processor/QueueProcessor.py index 9edf17f..2183cd4 100644 --- a/src/queue_processor/QueueProcessor.py +++ b/src/queue_processor/QueueProcessor.py @@ -45,9 +45,10 @@ def create_queues(self): self.queue_processor_logger.info(f"Creating queue {queue_name}") self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute() - def start(self, process: callable): + def start(self, process: callable, run_once: bool = False): self.queue_processor_logger.info("QueueProcessor running") while True: + executed_once = False for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names): try: self.create_queues() @@ -55,7 +56,7 @@ def start(self, process: callable): message = task_queue.receiveMessage().execute() task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute() results = process(utils.decode_message(message["message"])) - + executed_once = True if results: self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message( results @@ -72,3 +73,6 @@ def start(self, process: callable): self.exists_queues = False self.queue_processor_logger.error(f"Error: {e}", exc_info=True) sleep(60) + + if run_once and executed_once: + break \ No newline at end of file