From cf4880a711c0444b0341a9d258aea7b996461184 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Tue, 31 Jan 2023 11:59:53 +0100 Subject: [PATCH] fix endless loop when command queue is full from FeedWorkerProcess and a Worker node failed add basic unit testing for FeedWorkerProcess logic add unit test for when command queue is full --- mara_pipelines/execution.py | 15 ++++ mara_pipelines/pipelines.py | 1 + tests/test_parallel_read_feedworker.py | 101 +++++++++++++++++++++++++ 3 files changed, 117 insertions(+) create mode 100644 tests/test_parallel_read_feedworker.py diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 0ce0846..5db034f 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -257,6 +257,7 @@ def track_finished_pipelines(): for _, node in sub_pipeline.nodes.items(): if isinstance(node, pipelines.Worker): + node.origin_parent = next_node node.command_queue = command_queue process = FeedWorkersProcess(next_node, command_queue, event_queue, multiprocessing_context) @@ -312,6 +313,20 @@ def track_finished_pipelines(): for parent in task_process.task.parents()[:-1]: failed_pipelines.add(parent) + if not task_process.succeeded: + # Note: We do not support 'task_process.task.parent.ignore_errors' here to avoid endless loops: + # It could happen that all worker nodes fail. For this case there is currently no control + # implemented to kill the feed worker process. + + if isinstance(task_process, WorkerProcess) and \ + isinstance(task_process.task.origin_parent, pipelines.ParallelTask) and \ + task_process.task.origin_parent.use_workers: + # A worker task failed. We check if the 'FeedWorkerProcess' is still running ... + for node, process in running_task_processes.items(): + if node == task_process.task.origin_parent and isinstance(process, FeedWorkersProcess): + # Feed worker process found --> terminate it + process.terimate() + end_time = datetime.datetime.now(tz.utc) event_queue.put( pipeline_events.Output(task_process.task.path(), diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index 3a3d77f..dae5815 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -113,6 +113,7 @@ class Worker(Node): def __init__(self, id: str, description: str) -> None: super().__init__(id, description) self.command_queue: multiprocessing.Queue = None + self.origin_parent: Node = None # ParallelTask nodes get converted during the execution via the ParallelTask.launch() method. This property returns the original parent of the worker node. def run(self): import time diff --git a/tests/test_parallel_read_feedworker.py b/tests/test_parallel_read_feedworker.py new file mode 100644 index 0000000..2756642 --- /dev/null +++ b/tests/test_parallel_read_feedworker.py @@ -0,0 +1,101 @@ +"""Various tests in connection with the ParallelTask where 'use_workers' is True. This activates the FeedWorkerProcess.""" + +import pytest +import inspect + + +from mara_pipelines.commands.python import RunFunction +from mara_app.monkey_patch import patch +from mara_pipelines.pipelines import ParallelTask, Command +import mara_pipelines.config + + +# the tests are executed without database +import mara_db.config +patch(mara_db.config.databases)(lambda: {}) + + + +def method_name(): + return inspect.stack()[1][3] + + +class SimpleParallelProcess(ParallelTask): + def __init__(self, id: str, description: str, + function: callable, number_of_tasks: int = 10, max_number_of_parallel_tasks: int = None, commands_before: [Command] = None, commands_after: [Command] = None, context: str = None) -> None: + super().__init__(id, description, max_number_of_parallel_tasks, commands_before, commands_after, context) + self.use_workers = True + self.function = function + self.number_of_tasks = number_of_tasks + + def feed_workers(self): + for n in range(1, self.number_of_tasks): + yield RunFunction(function=self.function, args=[n]) + + +def simple_print_call(n: str): + print(f"Task {n}") + return True + + +def test_simple_parallel_process_succeeded(): + from mara_pipelines.pipelines import Pipeline + from mara_pipelines.ui.cli import run_pipeline + + pipeline = Pipeline(id=method_name(), description="") + + pipeline.add( + SimpleParallelProcess('simple_parallel_task', description="", function=simple_print_call) + ) + + assert run_pipeline(pipeline) + + +def simple_print_call_fail(n: str): + print(f"Task {n}") + return False + +def test_simple_parallel_process_fail(): + from mara_pipelines.pipelines import Pipeline + from mara_pipelines.ui.cli import run_pipeline + + pipeline = Pipeline(id=method_name(), description="") + + pipeline.add( + SimpleParallelProcess('simple_parallel_task', description="", function=simple_print_call_fail)) + + assert not run_pipeline(pipeline) + + + +def simple_print_call_fail(n: str): + print(f"Task {n}") + return False + +def test_queue_stress_fail(): + """ + Sometimes the queue for tasks is full and the FeedWorkerProcess tries and tries + again to fill in the recived commands into the internal queue, but it cannot + succeed because all worker nodes failed. + + This is a test for this case: The main execution loop in this case must + react on the worker process failed message and kill the FeedWorkerProcess. + """ + + from mara_pipelines.pipelines import Pipeline + from mara_pipelines.ui.cli import run_pipeline + + pipeline = Pipeline(id=method_name(), description="") + + # we stress the number of tasks to be above the maximum of + # the defined queue. We want to stress to come in a endless loop inside + # the feed worker since 1. all worker will fail and 2. the queue will + # be overloaded. The feed worker will retry and retry again to fill in + # the tasks into the queue. + number_of_tasks = 110 * mara_pipelines.config.max_number_of_parallel_tasks() + + pipeline.add( + SimpleParallelProcess('simple_parallel_task', description="", function=simple_print_call_fail, + number_of_tasks=number_of_tasks)) + + assert not run_pipeline(pipeline)