Skip to content

Commit

Permalink
Simplify the pool executor handling
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvanrun committed Aug 1, 2024
1 parent 55d92c6 commit cd3f0e1
Showing 1 changed file with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,9 @@ def _start_pool_worker(fn, predictions, max_workers, results, errors):


def _pool_worker(*, fn, predictions, max_workers, results, errors):
terminating_child_processes = False

caught_exception = False
with ProcessPoolExecutor(max_workers=max_workers) as executor:
try:

def handle_error(error, prediction):
nonlocal terminating_child_processes
if terminating_child_processes:
return

executor.shutdown(wait=False, cancel_futures=True)
errors.append((prediction, error))

terminating_child_processes = True
_terminate_child_processes()

# Submit the processing tasks of the predictions
futures = [
executor.submit(fn, prediction) for prediction in predictions
Expand All @@ -129,11 +116,16 @@ def handle_error(error, prediction):
result = future.result()
results.append(result)
except Exception as e:
handle_error(e, prediction=future_to_predictions[future])
errors.append((future_to_predictions[future], e))

if not caught_exception: # Hard stop
caught_exception = True

executor.shutdown(wait=False, cancel_futures=True)
_terminate_child_processes()
finally:
if not terminating_child_processes:
terminating_child_processes = True
_terminate_child_processes()
# Be aggresive in cleaning up any left-over processes
_terminate_child_processes()


def _terminate_child_processes():
Expand Down

0 comments on commit cd3f0e1

Please sign in to comment.