diff --git a/src/astra/pipelines/aspcap/__init__.py b/src/astra/pipelines/aspcap/__init__.py index d9cee3e..c37e660 100644 --- a/src/astra/pipelines/aspcap/__init__.py +++ b/src/astra/pipelines/aspcap/__init__.py @@ -97,7 +97,7 @@ def aspcap( n_executions, n_executions_total = (0, len(plans)) current_threads, current_workers, currently_loading = (0, 0, 0) - at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or max_concurrent_loading >= c + at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or c >= max_concurrent_loading with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as pre_and_post_process_executor: pre_process_futures = [ @@ -133,7 +133,7 @@ def check_capacity(current_threads, current_workers, currently_loading): except TimeoutError: None else: - completed_directory, return_code = ferre_future.result() + (completed_directory, return_code, t_overhead, t_elapsed) = ferre_future.result() #pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory) ferre_futures.remove(ferre_future) current_workers -= 1 @@ -169,32 +169,39 @@ def check_capacity(current_threads, current_workers, currently_loading): def ferre(directory, n_obj, child): - - stdout, n_complete = ([], 0) - t_start, t_overhead, t_elapsed = (time(), None, {}) - - process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - for line in iter(process.stdout.readline, ""): - # if an object is completed, note the completion time and send back some information to the parent. - if match := regex_next_object.search(line): - t_elapsed[int(match.group(1))] = -time() - if t_overhead is None: - t_overhead = time() - t_start - child.send((directory, 0)) - - if match := regex_completed.search(line): - t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time() - n_complete += 1 - child.send((directory, 1)) - stdout.append(line) - - # In case there were some we did not pick up the timings for - if n_complete != n_obj: - child.send((directory, n_obj - n_complete)) - - process.stdout.close() - return_code = process.wait() - return (directory, 0) + try: + + stdout, n_complete = ([], 0) + t_start, t_overhead, t_elapsed = (time(), None, {}) + + process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + for line in iter(process.stdout.readline, ""): + # if an object is completed, note the completion time and send back some information to the parent. + if match := regex_next_object.search(line): + t_elapsed[int(match.group(1))] = -time() + if t_overhead is None: + t_overhead = time() - t_start + child.send((directory, 0)) + + if match := regex_completed.search(line): + t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time() + n_complete += 1 + child.send((directory, 1)) + stdout.append(line) + + # In case there were some we did not pick up the timings for + if n_complete != n_obj: + child.send((directory, n_obj - n_complete)) + + with open(os.path.join(directory, "stdout"), "w") as fp: + fp.write("\n".join(stdout)) + + process.stdout.close() + return_code = process.wait() + except: + print(f"FAILED ON {directory}") + raise + return (directory, return_code, t_overhead, t_elapsed)