Skip to content

Commit

Permalink
max concurrent loading
Browse files Browse the repository at this point in the history
  • Loading branch information
andycasey committed Jan 3, 2025
1 parent 58e4db0 commit d88fe4d
Showing 1 changed file with 47 additions and 49 deletions.
96 changes: 47 additions & 49 deletions src/astra/pipelines/aspcap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ def aspcap(
yield ASPCAP.from_spectrum(spectrum, flag_no_suitable_initial_guess=True)

# If we load too many processes at once, the disk I/O kills us.
enforce_sequential_grid_loading = True
max_concurrent_loading = 1

current_threads, current_workers, currently_loading_grid = (0, 0, False)
at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or (enforce_sequential_grid_loading and c)
parent, child = Pipe()
n_executions, n_executions_total = (0, len(plans))
current_threads, current_workers, currently_loading = (0, 0, 0)

at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or max_concurrent_loading >= c

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as pre_and_post_process_executor:
pre_process_futures = [
Expand All @@ -105,76 +108,72 @@ def aspcap(
)
for plan in plans
]

results = []
ferre_futures = []
with tqdm(total=sum(len(plan["spectra"]) for plan in plans), desc="Running ASPCAP") as pb:

def check_capacity(current_threads, current_workers, currently_loading):
pb.set_description(
f"ASPCAP ("
f"{current_threads}/{max_threads}; "
f"{current_workers}/{max_workers}; "
f"{'loading' if currently_loading > 0 else ''} {n_executions}/{n_executions_total})"
)

while parent.poll():
child_directory, n = parent.recv()
if n == 0:
currently_loading -= 1
current_threads -= n
pb.update(n)

try:
ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=1))
except TimeoutError:
None
else:
completed_directory, return_code = ferre_future.result()
#pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory)
ferre_futures.remove(ferre_future)
current_workers -= 1

parent, child = Pipe()
return (current_threads, current_workers, currently_loading)

n_spectra = {}
results = []
ferre_futures = []
with tqdm(total=len(spectra) - len(spectra_with_no_initial_guess), desc="Running ASPCAP") as pb:

with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ferre_executor:

for future in concurrent.futures.as_completed(pre_process_futures):
directory, n_obj, skipped = future.result()
n_spectra[directory] = n_obj

# Spectra might be skipped because the file could not be found, or if there were too many bad pixels.
for spectrum, kwds in skipped:
pb.update(1)
#yield ASPCAP.from_spectrum(spectrum, **kwds)

# We could be at thread or worker capacity.
while at_capacity(current_threads, current_workers, currently_loading_grid):
pb.set_description(f"ASPCAP ({current_threads}/{max_threads}; {current_workers}/{max_workers}; {'loading' if currently_loading_grid else ''})")

while parent.poll():
child_directory, n = parent.recv()
if n == 0:
currently_loading_grid = False
current_threads -= n
pb.update(n)

try:
ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=1))
except TimeoutError:
continue
else:
completed_directory, return_code = ferre_future.result()
#pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory)

ferre_futures.remove(ferre_future)
current_workers -= 1
#print(f"Finished {completed_directory} with {n_spectra[completed_directory]} ({current_threads} threads, {current_workers} workers)")
while at_capacity(current_threads, current_workers, currently_loading):
current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading)

ferre_futures.append(ferre_executor.submit(ferre, directory, n_obj, child))
currently_loading_grid = True
current_threads += n_obj
current_workers += 1
#print(f"Submitting {directory} with {n_obj} ({current_threads} threads, {current_workers} workers)")

#print("All jobs submitted")
for ferre_future in concurrent.futures.as_completed(ferre_futures):
directory, return_code = ferre_future.result()
current_workers -= 1
#print(f"Finished {directory} with {n_spectra[directory]} ({current_threads} threads, {current_workers} workers)")

n_executions, currently_loading, current_threads, current_workers = (n_executions + 1, currently_loading + 1, current_threads + n_obj, current_workers + 1)

while current_workers:
current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading_grid)

t_full += time()
print(f"Full time: {t_full:.0f} s")
raise a


regex_next_object = re.compile(r"next object #\s+(\d+)")
regex_completed = re.compile(r"\s+(\d+)\s(\d+_[\d\w_]+)") # assumes input ids start with an integer and underscore


def ferre(directory, n_obj, child):

stdout, n_complete = ([], 0)
t_start, t_overhead, t_elapsed = (time(), None, {})

process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

for line in iter(process.stdout.readline, ""):
# if an object is completed, note the completion time and send back some information to the parent.
if match := regex_next_object.search(line):
Expand All @@ -184,18 +183,17 @@ def ferre(directory, n_obj, child):
child.send((directory, 0))

if match := regex_completed.search(line):
t_elapsed[int(match.group(1))] += time()
t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time()
n_complete += 1
child.send((directory, 1))
stdout.append(line)

# In case there were some we did not pick up the timings for
if n_missed := n_complete - n_obj:
child.send((directory, n_missed))
if n_complete != n_obj:
child.send((directory, n_obj - n_complete))

process.stdout.close()
return_code = process.wait()

return (directory, 0)


Expand Down

0 comments on commit d88fe4d

Please sign in to comment.