Skip to content

Commit

Permalink
bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
andycasey committed Jan 3, 2025
1 parent d88fe4d commit 5079e52
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions src/astra/pipelines/aspcap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def aspcap(
n_executions, n_executions_total = (0, len(plans))
current_threads, current_workers, currently_loading = (0, 0, 0)

at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or max_concurrent_loading >= c
at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or c >= max_concurrent_loading

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as pre_and_post_process_executor:
pre_process_futures = [
Expand Down Expand Up @@ -133,7 +133,7 @@ def check_capacity(current_threads, current_workers, currently_loading):
except TimeoutError:
None
else:
completed_directory, return_code = ferre_future.result()
(completed_directory, return_code, t_overhead, t_elapsed) = ferre_future.result()
#pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory)
ferre_futures.remove(ferre_future)
current_workers -= 1
Expand Down Expand Up @@ -169,32 +169,39 @@ def check_capacity(current_threads, current_workers, currently_loading):


def ferre(directory, n_obj, child):

stdout, n_complete = ([], 0)
t_start, t_overhead, t_elapsed = (time(), None, {})

process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
for line in iter(process.stdout.readline, ""):
# if an object is completed, note the completion time and send back some information to the parent.
if match := regex_next_object.search(line):
t_elapsed[int(match.group(1))] = -time()
if t_overhead is None:
t_overhead = time() - t_start
child.send((directory, 0))

if match := regex_completed.search(line):
t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time()
n_complete += 1
child.send((directory, 1))
stdout.append(line)

# In case there were some we did not pick up the timings for
if n_complete != n_obj:
child.send((directory, n_obj - n_complete))

process.stdout.close()
return_code = process.wait()
return (directory, 0)
try:

stdout, n_complete = ([], 0)
t_start, t_overhead, t_elapsed = (time(), None, {})

process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
for line in iter(process.stdout.readline, ""):
# if an object is completed, note the completion time and send back some information to the parent.
if match := regex_next_object.search(line):
t_elapsed[int(match.group(1))] = -time()
if t_overhead is None:
t_overhead = time() - t_start
child.send((directory, 0))

if match := regex_completed.search(line):
t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time()
n_complete += 1
child.send((directory, 1))
stdout.append(line)

# In case there were some we did not pick up the timings for
if n_complete != n_obj:
child.send((directory, n_obj - n_complete))

with open(os.path.join(directory, "stdout"), "w") as fp:
fp.write("\n".join(stdout))

process.stdout.close()
return_code = process.wait()
except:
print(f"FAILED ON {directory}")
raise
return (directory, return_code, t_overhead, t_elapsed)



Expand Down

0 comments on commit 5079e52

Please sign in to comment.