From 2558d820fd568de1fd9222e1678fc91be99c1a84 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Mon, 29 Jul 2024 13:17:36 +0200 Subject: [PATCH 01/14] Address decprecated warning ENV notation DockerFiles --- .../example-algorithm{{cookiecutter._}}/Dockerfile | 2 +- .../example-evaluation-method{{cookiecutter._}}/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/Dockerfile b/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/Dockerfile index 721515f..7953241 100644 --- a/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/Dockerfile +++ b/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/Dockerfile @@ -2,7 +2,7 @@ FROM --platform=linux/amd64 pytorch/pytorch # Use a 'large' base container to show-case how to load pytorch and use the GPU (when enabled) # Ensures that Python output to stdout/stderr is not buffered: prevents missing information when terminating -ENV PYTHONUNBUFFERED 1 +ENV PYTHONUNBUFFERED=1 RUN groupadd -r user && useradd -m --no-log-init -r -g user user USER user diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/Dockerfile b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/Dockerfile index 7e8976c..6bb0424 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/Dockerfile +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/Dockerfile @@ -1,7 +1,7 @@ FROM --platform=linux/amd64 docker.io/library/python:3.11-slim # Ensures that Python output to stdout/stderr is not buffered: prevents missing information when terminating -ENV PYTHONUNBUFFERED 1 +ENV PYTHONUNBUFFERED=1 RUN groupadd -r user && useradd -m --no-log-init -r -g user user USER user From 5259630f8cff2c0cc59a0f567b3d721540b9be6b Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Mon, 29 Jul 2024 16:26:27 +0200 Subject: [PATCH 02/14] Fix ignoring any children suddenly exiting --- .../evaluate.py.j2 | 7 ++++-- .../helpers.py | 22 ++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 index ca9ffcc..bc146c9 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 @@ -26,7 +26,7 @@ import random from statistics import mean from pathlib import Path from pprint import pformat, pprint -from helpers import run_prediction_processing +from helpers import run_prediction_processing, listen_to_children_errors INPUT_DIRECTORY = Path("/input") OUTPUT_DIRECTORY = Path("/output") @@ -42,7 +42,10 @@ def process(job): report += pformat(job) report += "\n" - # Firstly, find the location of the results + # Before we start, ensure we catch any child-process crashes + listen_to_children_errors() + + # Firstly, find the location of the results {% for ci in cookiecutter.phase.algorithm_outputs %} {%- set py_slug = ci.slug | replace("-", "_") -%} location_{{ py_slug }} = get_file_location( diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index 20dc325..d6883c2 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -153,7 +153,7 @@ def _terminate_child_processes(): pass # Not a problem # Wait for processes to terminate - gone, still_alive = psutil.wait_procs(children, timeout=5) + _, still_alive = psutil.wait_procs(children, timeout=5) # Forcefully kill any remaining processes for p in still_alive: @@ -162,3 +162,23 @@ def _terminate_child_processes(): p.kill() except psutil.NoSuchProcess: pass # That is fine + + _reap_all_children() + + +def _reap_all_children(): + """Reaps all child processes that have terminated""" + try: + while True: + pid, _ = os.waitpid(-1, os.WNOHANG) + if pid == 0: # No more terminated child processes + break + except ChildProcessError: + pass # No more child processes + + +def listen_to_children_errors(): + def handler(*_, **__): + _reap_all_children() + + signal.signal(signal.SIGCHLD, handler) From 4472a745b345570a5103dcaaeab68fb7d90f76e9 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Mon, 29 Jul 2024 16:57:52 +0200 Subject: [PATCH 03/14] Terminate all child process when it fails --- .../example-evaluation-method{{cookiecutter._}}/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index d6883c2..0aaba40 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -179,6 +179,7 @@ def _reap_all_children(): def listen_to_children_errors(): def handler(*_, **__): - _reap_all_children() + print("A child failed, terminating all other children") + _terminate_child_processes() signal.signal(signal.SIGCHLD, handler) From 13059144d2f65ee80ed9df8d4198b29d0f26c757 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Tue, 30 Jul 2024 10:04:22 +0200 Subject: [PATCH 04/14] Remove debug print statement printenv --- .../example-algorithm{{cookiecutter._}}/save.sh.j2 | 2 -- 1 file changed, 2 deletions(-) diff --git a/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/save.sh.j2 b/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/save.sh.j2 index 906d758..b1ad3e2 100755 --- a/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/save.sh.j2 +++ b/grand_challenge_forge/partials/example-algorithm/example-algorithm{{cookiecutter._}}/save.sh.j2 @@ -27,8 +27,6 @@ formatted_build_info=$(date -d "$build_timestamp" +"%Y%m%d_%H%M%S") # Set the output filename with timestamp and build information output_filename="${SCRIPT_DIR}/${container_tag}_${formatted_build_info}.tar.gz" -printenv - # Save the Docker container and gzip it docker save "$container_tag" | gzip -c > "$output_filename" From 1165dacc6e89c2f1e45c71290a00d38ee940af3b Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Wed, 31 Jul 2024 06:41:48 +0200 Subject: [PATCH 05/14] Prevent multiple shutdowns happening at the same time --- .../helpers.py | 24 ++++++++++++------- tests/test_evaluate_helpers.py | 18 ++++++++++---- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index 0aaba40..a90036a 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -100,15 +100,21 @@ def _start_pool_worker(fn, predictions, max_workers, results, errors): def _pool_worker(*, fn, predictions, max_workers, results, errors): terminating_child_processes = False - + executor_shutting_down = False with ProcessPoolExecutor(max_workers=max_workers) as executor: try: def handle_error(error, prediction="Unknown"): - executor.shutdown(wait=False, cancel_futures=True) + nonlocal terminating_child_processes + if terminating_child_processes: + return + + nonlocal executor_shutting_down + if not executor_shutting_down: + executor_shutting_down = True + executor.shutdown(wait=False, cancel_futures=True) errors.append((prediction, error)) - nonlocal terminating_child_processes terminating_child_processes = True _terminate_child_processes() @@ -139,13 +145,14 @@ def sigchld_handler(*_, **__): except Exception as e: handle_error(e, prediction=future_to_predictions[future]) finally: - terminating_child_processes = True - _terminate_child_processes() + if not terminating_child_processes: + terminating_child_processes = True + _terminate_child_processes() -def _terminate_child_processes(): - current_process = psutil.Process(os.getpid()) - children = current_process.children(recursive=True) +def _terminate_child_processes(pid=None): + process = psutil.Process(pid or os.getpid()) + children = process.children(recursive=True) for child in children: try: child.terminate() @@ -157,7 +164,6 @@ def _terminate_child_processes(): # Forcefully kill any remaining processes for p in still_alive: - print(f"Forcefully killing child process {p.pid}") try: p.kill() except psutil.NoSuchProcess: diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index b8da03d..b51e4f9 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -1,11 +1,11 @@ import os -import signal import sys import time from functools import partial from multiprocessing import Process from unittest import mock +import psutil import pytest # Do some creating path hacking to be able to import the helpers @@ -66,6 +66,18 @@ def send_signals_to_process(process, signal_to_send, interval): time.sleep(interval) +def terminate_children(process, interval): + while True: + process = psutil.Process(process.pid) + children = process.children(recursive=True) + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass # Not a problem + time.sleep(interval) + + def test_prediction_processing(): predictions = ["prediction1", "prediction2"] result = run_prediction_processing( @@ -110,9 +122,7 @@ def add_child_terminator(*args, **kwargs): process = _start_pool_worker(*args, **kwargs) nonlocal child_terminator child_terminator = Process( - target=partial( - send_signals_to_process, process, signal.SIGCHLD, 0.5 - ) + target=partial(terminate_children, process, 0.5) ) child_terminator.start() # Hasta la vista, baby return process From 2e6dfbee02d6340774145c63fa04f3905e1f5102 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 09:43:57 +0200 Subject: [PATCH 06/14] DEBUG add a state check for executor shutdown --- .../example-evaluation-method{{cookiecutter._}}/helpers.py | 2 ++ tests/test_evaluate_helpers.py | 1 + 2 files changed, 3 insertions(+) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index a90036a..092a13f 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -112,7 +112,9 @@ def handle_error(error, prediction="Unknown"): nonlocal executor_shutting_down if not executor_shutting_down: executor_shutting_down = True + print("### TEST Pre-shutdown") executor.shutdown(wait=False, cancel_futures=True) + print("### TEST Post-shutdown") errors.append((prediction, error)) terminating_child_processes = True diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index b51e4f9..007f2ab 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -119,6 +119,7 @@ def test_prediction_processing_catching_killing_of_child_processes(): # Set up the fake child murder scene def add_child_terminator(*args, **kwargs): + print("max workers: ", kwargs.get("max_workers")) process = _start_pool_worker(*args, **kwargs) nonlocal child_terminator child_terminator = Process( From ef9827083d53787db4e15d59787ce60b81dd788f Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 09:49:10 +0200 Subject: [PATCH 07/14] Revert "DEBUG add a state check for executor shutdown" This reverts commit 2e6dfbee02d6340774145c63fa04f3905e1f5102. --- .../example-evaluation-method{{cookiecutter._}}/helpers.py | 2 -- tests/test_evaluate_helpers.py | 1 - 2 files changed, 3 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index 092a13f..a90036a 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -112,9 +112,7 @@ def handle_error(error, prediction="Unknown"): nonlocal executor_shutting_down if not executor_shutting_down: executor_shutting_down = True - print("### TEST Pre-shutdown") executor.shutdown(wait=False, cancel_futures=True) - print("### TEST Post-shutdown") errors.append((prediction, error)) terminating_child_processes = True diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index 007f2ab..b51e4f9 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -119,7 +119,6 @@ def test_prediction_processing_catching_killing_of_child_processes(): # Set up the fake child murder scene def add_child_terminator(*args, **kwargs): - print("max workers: ", kwargs.get("max_workers")) process = _start_pool_worker(*args, **kwargs) nonlocal child_terminator child_terminator = Process( From 0cda1d93c13b8ee2a0fa5fc1f4dd567b5b1d7c3c Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 09:58:20 +0200 Subject: [PATCH 08/14] Rename child stopper to cover both terminate and kill --- tests/test_evaluate_helpers.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index b51e4f9..9f20f5a 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -66,7 +66,7 @@ def send_signals_to_process(process, signal_to_send, interval): time.sleep(interval) -def terminate_children(process, interval): +def stop_children(process, interval): while True: process = psutil.Process(process.pid) children = process.children(recursive=True) @@ -115,16 +115,14 @@ def test_prediction_processing_killing_of_child_processes(): def test_prediction_processing_catching_killing_of_child_processes(): predictions = ["prediction1", "prediction2"] - child_terminator = None + child_stopper = None # Set up the fake child murder scene def add_child_terminator(*args, **kwargs): process = _start_pool_worker(*args, **kwargs) - nonlocal child_terminator - child_terminator = Process( - target=partial(terminate_children, process, 0.5) - ) - child_terminator.start() # Hasta la vista, baby + nonlocal child_stopper + child_stopper = Process(target=partial(stop_children, process, 0.5)) + child_stopper.start() # Hasta la vista, baby return process try: @@ -134,8 +132,8 @@ def add_child_terminator(*args, **kwargs): fn=forever_process, predictions=predictions ) finally: - if child_terminator: - child_terminator.terminate() + if child_stopper: + child_stopper.terminate() assert "Child process was terminated unexpectedly" in str( excinfo.value.error From 6c8f8b1a034b3388716852f6af411e7a1f3f2191 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 09:58:39 +0200 Subject: [PATCH 09/14] Remove now redudant kwargs to function --- .../example-evaluation-method{{cookiecutter._}}/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index a90036a..ea98da3 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -150,8 +150,8 @@ def sigchld_handler(*_, **__): _terminate_child_processes() -def _terminate_child_processes(pid=None): - process = psutil.Process(pid or os.getpid()) +def _terminate_child_processes(): + process = psutil.Process(os.getpid()) children = process.children(recursive=True) for child in children: try: From 42da7320ed233d5b26067d917b094b1c6b10c3e8 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 10:59:02 +0200 Subject: [PATCH 10/14] Replace reaping with less involved process --- .../helpers.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index ea98da3..cff8805 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -169,18 +169,11 @@ def _terminate_child_processes(): except psutil.NoSuchProcess: pass # That is fine - _reap_all_children() - - -def _reap_all_children(): - """Reaps all child processes that have terminated""" + # Finally, prevent zombies by waiting for all child processes try: - while True: - pid, _ = os.waitpid(-1, os.WNOHANG) - if pid == 0: # No more terminated child processes - break + os.waitpid(-1, 0) except ChildProcessError: - pass # No more child processes + pass # No child processes, that if fine def listen_to_children_errors(): From 55d92c6a36903ba1fdbaf172528768b80b4425c9 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 14:42:23 +0200 Subject: [PATCH 11/14] Remove signal listener as it made the executor unstable --- .../helpers.py | 20 ++----------- tests/test_evaluate_helpers.py | 30 +++++++------------ 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index cff8805..da84ecd 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -100,35 +100,21 @@ def _start_pool_worker(fn, predictions, max_workers, results, errors): def _pool_worker(*, fn, predictions, max_workers, results, errors): terminating_child_processes = False - executor_shutting_down = False + with ProcessPoolExecutor(max_workers=max_workers) as executor: try: - def handle_error(error, prediction="Unknown"): + def handle_error(error, prediction): nonlocal terminating_child_processes if terminating_child_processes: return - nonlocal executor_shutting_down - if not executor_shutting_down: - executor_shutting_down = True - executor.shutdown(wait=False, cancel_futures=True) + executor.shutdown(wait=False, cancel_futures=True) errors.append((prediction, error)) terminating_child_processes = True _terminate_child_processes() - def sigchld_handler(*_, **__): - if not terminating_child_processes: - handle_error( - RuntimeError( - "Child process was terminated unexpectedly" - ) - ) - - # Register the SIGCHLD handler - signal.signal(signal.SIGCHLD, sigchld_handler) - # Submit the processing tasks of the predictions futures = [ executor.submit(fn, prediction) for prediction in predictions diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index 9f20f5a..20d75dd 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -27,7 +27,7 @@ # Some of the test below, if things go wrong, can potentially deadlock. # So we set a maximum runtime -pytestmark = pytest.mark.timeout(5) +pytestmark = pytest.mark.timeout(4) def working_process(p): @@ -55,26 +55,18 @@ def forever_process(*_): time.sleep(1) -def send_signals_to_process(process, signal_to_send, interval): - while True: - try: - os.kill(process.pid, signal_to_send) - except ProcessLookupError: - # Race conditions sometimes have this try and send a signal even though - # the process is already terminated - pass - time.sleep(interval) - - def stop_children(process, interval): - while True: + stopped = False + while not stopped: process = psutil.Process(process.pid) children = process.children(recursive=True) - for child in children: - try: - child.kill() - except psutil.NoSuchProcess: - pass # Not a problem + if children: + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass # Not a problem + stopped = True time.sleep(interval) @@ -135,6 +127,6 @@ def add_child_terminator(*args, **kwargs): if child_stopper: child_stopper.terminate() - assert "Child process was terminated unexpectedly" in str( + assert "A process in the process pool was terminated abruptly" in str( excinfo.value.error ) From cd3f0e198a6b00e99720016653c48d3db7b7d401 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 15:09:28 +0200 Subject: [PATCH 12/14] Simplify the pool executor handling --- .../helpers.py | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index da84ecd..731b47c 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -99,22 +99,9 @@ def _start_pool_worker(fn, predictions, max_workers, results, errors): def _pool_worker(*, fn, predictions, max_workers, results, errors): - terminating_child_processes = False - + caught_exception = False with ProcessPoolExecutor(max_workers=max_workers) as executor: try: - - def handle_error(error, prediction): - nonlocal terminating_child_processes - if terminating_child_processes: - return - - executor.shutdown(wait=False, cancel_futures=True) - errors.append((prediction, error)) - - terminating_child_processes = True - _terminate_child_processes() - # Submit the processing tasks of the predictions futures = [ executor.submit(fn, prediction) for prediction in predictions @@ -129,11 +116,16 @@ def handle_error(error, prediction): result = future.result() results.append(result) except Exception as e: - handle_error(e, prediction=future_to_predictions[future]) + errors.append((future_to_predictions[future], e)) + + if not caught_exception: # Hard stop + caught_exception = True + + executor.shutdown(wait=False, cancel_futures=True) + _terminate_child_processes() finally: - if not terminating_child_processes: - terminating_child_processes = True - _terminate_child_processes() + # Be aggresive in cleaning up any left-over processes + _terminate_child_processes() def _terminate_child_processes(): From e37a835828a909f09d879a16888af57964390564 Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Thu, 1 Aug 2024 15:29:29 +0200 Subject: [PATCH 13/14] Fix tests expecting an explicit result ordering --- .../example-evaluation-method{{cookiecutter._}}/helpers.py | 2 ++ tests/test_evaluate_helpers.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index 731b47c..b69a06b 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -44,6 +44,8 @@ def run_prediction_processing(*, fn, predictions): - if any child process is terminated, all prediction processing will abort - after prediction processing is done, all child processes are terminated + Note that the results are returned in completing order. + Parameters ---------- fn : function diff --git a/tests/test_evaluate_helpers.py b/tests/test_evaluate_helpers.py index 20d75dd..d0856b6 100644 --- a/tests/test_evaluate_helpers.py +++ b/tests/test_evaluate_helpers.py @@ -31,6 +31,8 @@ def working_process(p): + if p == "prediction1": + time.sleep(2) return f"{p} result" @@ -75,7 +77,7 @@ def test_prediction_processing(): result = run_prediction_processing( fn=working_process, predictions=predictions ) - assert ["prediction1 result", "prediction2 result"] == result + assert {"prediction1 result", "prediction2 result"} == set(result) def test_prediction_processing_error(): From 5baa601ff6f4094e64623bcb25f4c15a1f01bafe Mon Sep 17 00:00:00 2001 From: Chris van Run Date: Mon, 12 Aug 2024 13:51:21 +0200 Subject: [PATCH 14/14] Remove listen_to_children function --- .../evaluate.py.j2 | 5 +---- .../helpers.py | 9 --------- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 index bc146c9..41bbf5d 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/evaluate.py.j2 @@ -26,7 +26,7 @@ import random from statistics import mean from pathlib import Path from pprint import pformat, pprint -from helpers import run_prediction_processing, listen_to_children_errors +from helpers import run_prediction_processing INPUT_DIRECTORY = Path("/input") OUTPUT_DIRECTORY = Path("/output") @@ -42,9 +42,6 @@ def process(job): report += pformat(job) report += "\n" - # Before we start, ensure we catch any child-process crashes - listen_to_children_errors() - # Firstly, find the location of the results {% for ci in cookiecutter.phase.algorithm_outputs %} {%- set py_slug = ci.slug | replace("-", "_") -%} diff --git a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py index b69a06b..de077c6 100644 --- a/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py +++ b/grand_challenge_forge/partials/example-evaluation-method/example-evaluation-method{{cookiecutter._}}/helpers.py @@ -1,6 +1,5 @@ import multiprocessing import os -import signal from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import Manager, Process @@ -154,11 +153,3 @@ def _terminate_child_processes(): os.waitpid(-1, 0) except ChildProcessError: pass # No child processes, that if fine - - -def listen_to_children_errors(): - def handler(*_, **__): - print("A child failed, terminating all other children") - _terminate_child_processes() - - signal.signal(signal.SIGCHLD, handler)