From 1165dacc6e89c2f1e45c71290a00d38ee940af3b Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Wed, 31 Jul 2024 06:41:48 +0200 Subject: [PATCH] Prevent multiple shutdowns happening at the same time --- .../helpers.py | 24 ++++++++++++------- tests/test_evaluate_helpers.py | 18 ++++++++++---- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index 0aaba40..a90036a 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -100,15 +100,21 @@ def _start_pool_worker(fn, predictions, max_workers, results, errors): def _pool_worker(*, fn, predictions, max_workers, results, errors): terminating_child_processes = False - + executor_shutting_down = False with ProcessPoolExecutor(max_workers=max_workers) as executor: try: def handle_error(error, prediction="Unknown"): - executor.shutdown(wait=False, cancel_futures=True) + nonlocal terminating_child_processes + if terminating_child_processes: + return + + nonlocal executor_shutting_down + if not executor_shutting_down: + executor_shutting_down = True + executor.shutdown(wait=False, cancel_futures=True) errors.append((prediction, error)) - nonlocal terminating_child_processes terminating_child_processes = True _terminate_child_processes() @@ -139,13 +145,14 @@ def sigchld_handler(*_, **__): except Exception as e: handle_error(e, prediction=future_to_predictions[future]) finally: - terminating_child_processes = True - _terminate_child_processes() + if not terminating_child_processes: + terminating_child_processes = True + _terminate_child_processes() -def _terminate_child_processes(): - current_process = psutil.Process(os.getpid()) - children = current_process.children(recursive=True) +def _terminate_child_processes(pid=None): + process = psutil.Process(pid or os.getpid()) + children = process.children(recursive=True) for child in children: try: child.terminate() @@ -157,7 +164,6 @@ def _terminate_child_processes(): # Forcefully kill any remaining processes for p in still_alive: - print(f"Forcefully killing child process {p.pid}") try: p.kill() except psutil.NoSuchProcess: diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index b8da03d..b51e4f9 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -1,11 +1,11 @@ import os -import signal import sys import time from functools import partial from multiprocessing import Process from unittest import mock +import psutil import pytest # Do some creating path hacking to be able to import the helpers @@ -66,6 +66,18 @@ def send_signals_to_process(process, signal_to_send, interval): time.sleep(interval) +def terminate_children(process, interval): + while True: + process = psutil.Process(process.pid) + children = process.children(recursive=True) + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass # Not a problem + time.sleep(interval) + + def test_prediction_processing(): predictions = ["prediction1", "prediction2"] result = run_prediction_processing( @@ -110,9 +122,7 @@ def add_child_terminator(*args, **kwargs): process = _start_pool_worker(*args, **kwargs) nonlocal child_terminator child_terminator = Process( - target=partial( - send_signals_to_process, process, signal.SIGCHLD, 0.5 - ) + target=partial(terminate_children, process, 0.5) ) child_terminator.start() # Hasta la vista, baby return process