From d88fe4d2064f83bc2448b20ff8268b657d68d168 Mon Sep 17 00:00:00 2001 From: Andy Casey Date: Fri, 3 Jan 2025 14:40:33 -0700 Subject: [PATCH] max concurrent loading --- src/astra/pipelines/aspcap/__init__.py | 96 +++++++++++++------------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/src/astra/pipelines/aspcap/__init__.py b/src/astra/pipelines/aspcap/__init__.py index 00a1501..d9cee3e 100644 --- a/src/astra/pipelines/aspcap/__init__.py +++ b/src/astra/pipelines/aspcap/__init__.py @@ -91,10 +91,13 @@ def aspcap( yield ASPCAP.from_spectrum(spectrum, flag_no_suitable_initial_guess=True) # If we load too many processes at once, the disk I/O kills us. - enforce_sequential_grid_loading = True + max_concurrent_loading = 1 - current_threads, current_workers, currently_loading_grid = (0, 0, False) - at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or (enforce_sequential_grid_loading and c) + parent, child = Pipe() + n_executions, n_executions_total = (0, len(plans)) + current_threads, current_workers, currently_loading = (0, 0, 0) + + at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or max_concurrent_loading >= c with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as pre_and_post_process_executor: pre_process_futures = [ @@ -105,60 +108,57 @@ def aspcap( ) for plan in plans ] + + results = [] + ferre_futures = [] + with tqdm(total=sum(len(plan["spectra"]) for plan in plans), desc="Running ASPCAP") as pb: + + def check_capacity(current_threads, current_workers, currently_loading): + pb.set_description( + f"ASPCAP (" + f"{current_threads}/{max_threads}; " + f"{current_workers}/{max_workers}; " + f"{'loading' if currently_loading > 0 else ''} {n_executions}/{n_executions_total})" + ) + + while parent.poll(): + child_directory, n = parent.recv() + if n == 0: + currently_loading -= 1 + current_threads -= n + pb.update(n) + + try: + ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=1)) + except TimeoutError: + None + else: + completed_directory, return_code = ferre_future.result() + #pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory) + ferre_futures.remove(ferre_future) + current_workers -= 1 - parent, child = Pipe() + return (current_threads, current_workers, currently_loading) - n_spectra = {} - results = [] - ferre_futures = [] - with tqdm(total=len(spectra) - len(spectra_with_no_initial_guess), desc="Running ASPCAP") as pb: - with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ferre_executor: for future in concurrent.futures.as_completed(pre_process_futures): directory, n_obj, skipped = future.result() - n_spectra[directory] = n_obj # Spectra might be skipped because the file could not be found, or if there were too many bad pixels. for spectrum, kwds in skipped: pb.update(1) #yield ASPCAP.from_spectrum(spectrum, **kwds) - # We could be at thread or worker capacity. - while at_capacity(current_threads, current_workers, currently_loading_grid): - pb.set_description(f"ASPCAP ({current_threads}/{max_threads}; {current_workers}/{max_workers}; {'loading' if currently_loading_grid else ''})") - - while parent.poll(): - child_directory, n = parent.recv() - if n == 0: - currently_loading_grid = False - current_threads -= n - pb.update(n) - - try: - ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=1)) - except TimeoutError: - continue - else: - completed_directory, return_code = ferre_future.result() - #pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory) - - ferre_futures.remove(ferre_future) - current_workers -= 1 - #print(f"Finished {completed_directory} with {n_spectra[completed_directory]} ({current_threads} threads, {current_workers} workers)") + while at_capacity(current_threads, current_workers, currently_loading): + current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading) ferre_futures.append(ferre_executor.submit(ferre, directory, n_obj, child)) - currently_loading_grid = True - current_threads += n_obj - current_workers += 1 - #print(f"Submitting {directory} with {n_obj} ({current_threads} threads, {current_workers} workers)") - - #print("All jobs submitted") - for ferre_future in concurrent.futures.as_completed(ferre_futures): - directory, return_code = ferre_future.result() - current_workers -= 1 - #print(f"Finished {directory} with {n_spectra[directory]} ({current_threads} threads, {current_workers} workers)") - + n_executions, currently_loading, current_threads, current_workers = (n_executions + 1, currently_loading + 1, current_threads + n_obj, current_workers + 1) + + while current_workers: + current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading_grid) + t_full += time() print(f"Full time: {t_full:.0f} s") raise a @@ -166,7 +166,7 @@ def aspcap( regex_next_object = re.compile(r"next object #\s+(\d+)") regex_completed = re.compile(r"\s+(\d+)\s(\d+_[\d\w_]+)") # assumes input ids start with an integer and underscore - + def ferre(directory, n_obj, child): @@ -174,7 +174,6 @@ def ferre(directory, n_obj, child): t_start, t_overhead, t_elapsed = (time(), None, {}) process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - for line in iter(process.stdout.readline, ""): # if an object is completed, note the completion time and send back some information to the parent. if match := regex_next_object.search(line): @@ -184,18 +183,17 @@ def ferre(directory, n_obj, child): child.send((directory, 0)) if match := regex_completed.search(line): - t_elapsed[int(match.group(1))] += time() + t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time() n_complete += 1 child.send((directory, 1)) stdout.append(line) # In case there were some we did not pick up the timings for - if n_missed := n_complete - n_obj: - child.send((directory, n_missed)) + if n_complete != n_obj: + child.send((directory, n_obj - n_complete)) process.stdout.close() return_code = process.wait() - return (directory, 0)