Skip to content

Commit

Permalink
load tested successfully with 5000
Browse files Browse the repository at this point in the history
  • Loading branch information
andycasey committed Jan 21, 2025
1 parent bf66885 commit 37e9f01
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 82 deletions.
4 changes: 1 addition & 3 deletions src/astra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def task(
while True:
try:
result = next(timer)
# `Ellipsis` has a special meaning to Astra tasks.
# It is a marker that tells the Astra timer that the interval spent so far is related
# to common overheads, not specifically to the calculations of one result.

if result is Ellipsis:
continue

Expand Down
17 changes: 14 additions & 3 deletions src/astra/cli/astra.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,20 @@ def run(
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.logging import RichHandler
from rich.console import Console
from logging import FileHandler

from astra import models, __version__, generate_queries_for_task
from astra.utils import resolve_task, accepts_live_renderable
from astra.utils import log, resolve_task, accepts_live_renderable

fun = resolve_task(task)
fun_accepts_live_renderable = accepts_live_renderable(fun)
live_renderable = Table.grid()

# Re-direct log handler
console = Console()

if not fun_accepts_live_renderable:
overall_progress = Progress(
TextColumn("[progress.description]{task.description}"),
Expand All @@ -188,8 +194,13 @@ def run(
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
)
live_renderable.add_row(Panel(overall_progress, title=task))

with Live(live_renderable) as live:

with Live(live_renderable, console=console, redirect_stdout=False, redirect_stderr=False) as live:
log.handlers.clear()
log.handlers.extend([
RichHandler(console=live.console, markup=True, rich_tracebacks=True),
])

for model, q in generate_queries_for_task(fun, spectrum_model, limit, page=page):
if total := q.count():
if not fun_accepts_live_renderable:
Expand Down
104 changes: 84 additions & 20 deletions src/astra/pipelines/aspcap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
from astra.pipelines.aspcap.utils import ABUNDANCE_RELATIVE_TO_H


def do_logger(*foo):
None
#with open("astra.log", "a") as fp:
# fp.write(" ".join(map(str, foo)) + "\n")

def _is_list_mode(path):
return "input_list.nml" in path

Expand Down Expand Up @@ -111,6 +116,7 @@ def aspcap(

parent, child = Pipe()
with concurrent.futures.ProcessPoolExecutor(max_workers=max(max_threads, max_processes)) as executor:

stage_args = [executor, parent, child, parent_dir, max_processes, max_threads, max_concurrent_loading, soft_thread_ratio]
if live_renderable is not None:
from rich.panel import Panel
Expand All @@ -135,9 +141,11 @@ def aspcap(
for spectrum in spectra_with_no_initial_guess:
yield ASPCAP.from_spectrum(spectrum, flag_no_suitable_initial_guess=True)


coarse_results, coarse_failures = _aspcap_stage("coarse", coarse_plans, *stage_args)
yield from coarse_failures


stellar_parameter_plans, best_coarse_results = plan_stellar_parameters_stage(
spectra=spectra,
parent_dir=parent_dir,
Expand All @@ -157,7 +165,7 @@ def aspcap(
use_ferre_list_mode=use_ferre_list_mode
)

abundance_results, abundance_failures = _aspcap_stage("abundances", abundance_plans, *stage_args)
abundance_results, abundance_failures = _aspcap_stage("abundances", abundance_plans, *stage_args, ferre_kwds=dict(max_sigma_outlier=10, max_t_elapsed=30))
parent.close()
child.close()
executor.shutdown(wait=False, cancel_futures=True)
Expand Down Expand Up @@ -270,6 +278,7 @@ def _aspcap_stage(
max_concurrent_loading,
soft_thread_ratio,
progress=None,
ferre_kwds=None
):
pb = None
if progress is not None:
Expand Down Expand Up @@ -317,15 +326,17 @@ def get_task_name(path):

def check_capacity(current_processes, current_threads, currently_loading):

#do_logger("doing poll check")
while parent.poll():
#do_logger("awaiting message")
state = parent.recv()

delta_n_loading = state.get("n_loading", 0)
delta_n_complete = state.get("n_complete", 0)
currently_loading += delta_n_loading
current_threads += state.get("n_threads", 0)
current_processes += state.get("n_processes", 0)

do_logger(f"state: {state}")
if progress is not None:
progress_kwds = dict(advance=delta_n_complete)
task_name = get_task_name(state['input_nml_path'])
Expand All @@ -345,20 +356,49 @@ def check_capacity(current_processes, current_threads, currently_loading):
f"job {n_started_executions}/{n_planned_executions})"
)
pb.update(delta_n_complete)
#do_logger("ok")

#do_logger("getting ferre future")
try:
ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=0))
except (concurrent.futures.TimeoutError, StopIteration):
None
else:
do_logger(f"got a ferre future")
(input_nml_path, pwd, return_code, t_overhead, t_elapsed) = ferre_future.result()

do_logger(f"READY TO POST_PROCESS: {input_nml_path} {return_code} {pwd}")
"""
try:
if "input_list.nml" in input_nml_path:
from glob import glob
for sub_path in glob(os.path.dirname(input_nml_path) + "/*/input.nml"):
for basename in ("parameter.output", "rectified_flux.output", "rectified_model_flux.output"):
p = os.path.join(os.path.dirname(sub_path), basename)
do_logger(f"{os.path.dirname(sub_path)} {p} {os.path.exists(p)} {os.path.getsize(p) if os.path.exists(p) else None}")
else:
for basename in ("parameter.output", "rectified_flux.output", "rectified_model_flux.output"):
p = os.path.join(os.path.dirname(input_nml_path), basename)
do_logger(f"{p} {os.path.exists(p)} {os.path.getsize(p) if os.path.exists(p) else None}")
os.system("ps -ef | grep ferre")
post_process_ferre(input_nml_path, pwd)
do_logger("Did it")
except:
executor.shutdown(wait=False, cancel_futures=True)
raise
"""

# TODO: Should `timings` and `post_process_ferre` take directories or input_nml_paths?
task_name = get_task_name(os.path.dirname(input_nml_path))
timings[task_name] = (t_overhead, t_elapsed)
post_processed_futures.append(executor.submit(post_process_ferre, input_nml_path, pwd))
ferre_futures.remove(ferre_future)
do_logger("removed ferre futures")

#do_logger(f"CHECKING CAPACITY: {current_processes} {current_threads} {currently_loading}")
return (current_processes, current_threads, currently_loading)

while n_planned_executions > n_started_executions:
Expand Down Expand Up @@ -386,7 +426,7 @@ def check_capacity(current_processes, current_threads, currently_loading):
break

if n_obj > 0:
ferre_futures.append(executor.submit(ferre, input_nml_path, pwd, n_obj, n_ferre_threads, child, communicate_on_start=False))
ferre_futures.append(executor.submit(ferre, input_nml_path, pwd, n_obj, n_ferre_threads, child, communicate_on_start=False, **(ferre_kwds or {})))
# Do the communication here ourselves because otherwise we will submit too many jobs before they start.
if progress is not None:
task_name = get_task_name(input_nml_path)
Expand All @@ -397,25 +437,31 @@ def check_capacity(current_processes, current_threads, currently_loading):
n_started_executions += 1

# All submitted. Now wait for them to finish.
while current_processes:
while current_processes or len(ferre_futures) > 0:
current_processes, current_threads, currently_loading = check_capacity(current_processes, current_threads, currently_loading)


do_logger(f"waiting for ferre futures {len(ferre_futures)} {len(post_processed_futures)}")
for future in concurrent.futures.as_completed(post_processed_futures):
# If the number of spectra being processed in one job gets too large, we might need to write the timing information to a temporary file
# in the child thread, and have the parent pick it up.
do_logger("got a result")
for result in future.result():
do_logger(f"result -> {result}")
# Assign timings to the results.
try:
key = get_task_name(result["pwd"])
t_overhead, t_elapsed_all = timings[key]
t_elapsed = t_elapsed_all[result["ferre_name"]]
except:
do_logger("failure")
t_elapsed = t_overhead = np.nan
finally:
do_logger("ok")
result["t_overhead"] = t_overhead
result["t_elapsed"] = np.sum(np.atleast_1d(t_elapsed))
successes.append(result)

do_logger(f"doing thing {post_processed_futures}")
if progress is not None:
for task_id in ferre_tasks.values():
progress.update(task_id, completed=True, visible=False, refresh=True)
Expand All @@ -433,12 +479,11 @@ def ferre(
n_obj,
n_threads,
pipe,
max_sigma_outlier=10,
max_t_elapsed=30,
max_sigma_outlier=None,
max_t_elapsed=None,
communicate_on_start=True
):
try:

try:
if communicate_on_start:
pipe.send(dict(input_nml_path=input_nml_path, n_processes=1, n_loading=1, n_threads=max(0, n_threads)))

Expand All @@ -460,7 +505,8 @@ def ferre(

def monitor():
while not ferre_hanging.is_set():
if t_overhead is not None and (max_sigma_outlier is not None or max_t_elapsed is not None) and t_awaiting:
do_logger(f"monitor {max_sigma_outlier} {max_t_elapsed} {t_awaiting} in {cwd}")
if (max_sigma_outlier is not None or max_t_elapsed is not None) and t_awaiting:
n_await = len(t_awaiting)
n_execution = 0 if len(t_elapsed) == 0 else max(list(map(len, t_elapsed.values())))
n_complete = sum([len(v) == n_execution for v in t_elapsed.values()])
Expand All @@ -469,20 +515,27 @@ def monitor():
for k, v in t_elapsed.items():
t_elapsed_per_spectrum_execution.extend(v)

do_logger(f"checking on {n_await} things {len(t_awaiting)} {len(t_elapsed_per_spectrum_execution)} {max_t_elapsed} {max_sigma_outlier}")

if (
(len(t_elapsed_per_spectrum_execution) > 1 and len(t_awaiting) > 0)
(len(t_awaiting) > 0)
and (max_t_elapsed is not None or max_sigma_outlier is not None)
):

#if (max_sigma_outlier is not None or max_t_elapsed is not None) and len(t_elapsed_per_spectrum_execution) > len(t_awaiting) and len(t_elapsed_per_spectrum_execution) > 0 and len(t_awaiting) > 0:
median = np.median(t_elapsed_per_spectrum_execution)
stddev = np.std(t_elapsed_per_spectrum_execution)
max_elapsed_this_execution = np.max(t_elapsed_per_spectrum_execution)

if len(t_elapsed_per_spectrum_execution) == 0:
median = 120.0
stddev = 10.0
else:
median = np.median(t_elapsed_per_spectrum_execution)
stddev = np.std(t_elapsed_per_spectrum_execution)
if stddev == 0:
stddev = 10.0

t_awaiting_elapsed = { k: (time() + v) for k, v in t_awaiting.items() }
waiting_elapsed = max(t_awaiting_elapsed.values())
sigma_outlier = (waiting_elapsed - median)/stddev



# We want to be sure that we have a reasonable estimate of the wait time for existing things.
# We can use previous executions to estimate this, if it is part of a list mode.
is_hanging = [
Expand All @@ -493,10 +546,13 @@ def monitor():
and (max_sigma_outlier is None or ((v - median)/stddev) > max_sigma_outlier)
)
]
do_logger(f"median / stddev {median:.2} {stddev:.2f} {t_awaiting_elapsed} {waiting_elapsed:.2f} {sigma_outlier} {is_hanging}")

# TODO: strace the process and check that it is waiting on FUTEX_PRIVATE_WAIT before killing it?
# Need to kill and re-run the process.
if is_hanging:
exclude_indices.extend(is_hanging)
exclude_indices.extend(is_hanging)
do_logger(f"hanging {is_hanging}")
ferre_hanging.set()
try:
process.kill()
Expand Down Expand Up @@ -536,6 +592,10 @@ def monitor():

stderr = process.stderr.read()
return_code = int(process.wait())
try:
process.kill()
except:
None
process.stdout.close()
process.stderr.close()

Expand Down Expand Up @@ -565,7 +625,7 @@ def monitor():
for k, v in this_t_elapsed.items():
t_elapsed.setdefault(k, [])
t_elapsed[k].extend(v)
#print(f"DONE REPROCESSING {failed_input_nml_path} {updated_nml_path}")
#do_logger(f"DONE REPROCESSING {failed_input_nml_path} {updated_nml_path}")
"""

prefix, suffix = input_nml_path.split(".nml")
Expand All @@ -578,7 +638,8 @@ def monitor():

# Release these threads and process so it's balanced out when the sub-ferre process takes them
pipe.send(dict(input_nml_path=input_nml_path, n_threads=-n_threads))
*_, this_t_overhead, this_t_elapsed = ferre(new_path, cwd, n_obj - n_complete + n_spectra_done_in_last_execution, n_threads, pipe)

*_, this_t_overhead, this_t_elapsed = ferre(new_path, cwd, n_obj - n_complete + n_spectra_done_in_last_execution, n_threads, pipe, max_sigma_outlier, max_t_elapsed)

t_overhead = (t_overhead or 0) + this_t_overhead
for k, v in this_t_elapsed.items():
Expand All @@ -595,6 +656,9 @@ def monitor():
return foo
"""

# Set ferre_hanging to kill the daemon thread.
ferre_hanging.set()
return (input_nml_path, cwd, return_code, t_overhead, t_elapsed)
except:
ferre_hanging.set()
return (input_nml_path, cwd, -10, t_overhead, t_elapsed)
7 changes: 6 additions & 1 deletion src/astra/pipelines/aspcap/stellar_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ def plan_stellar_parameters_stage(spectra, parent_dir, coarse_results, weight_pa
elif this.penalized_rchi2 == existing.penalized_rchi2:
best = existing
best.flag_multiple_equally_good_coarse_results = True
best.ferre_time_coarse = this.t_elapsed + existing.t_elapsed

if best is None:
log.error(f"Error for {kwds} - best is None. {existing} {existing.penalized_rchi2} {this} {this.penalized_rchi2}")
else:
best.ferre_time_coarse = this.t_elapsed + existing.t_elapsed

finally:
best_coarse_results[this.spectrum_pk] = best

Expand Down
5 changes: 3 additions & 2 deletions src/astra/pipelines/ferre/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ def _post_process_ferre(input_nml_path, pwd=None, skip_pixel_arrays=False, **kwa
if os.path.exists(path):
os.system(f"vaffoff {parameter_input_path} {path}")

try:
parameters, e_parameters, meta, names_with_missing_outputs = utils.read_output_parameter_file(pwd, control_kwds, input_names)
parameters, e_parameters, meta, names_with_missing_outputs = utils.read_output_parameter_file(pwd, control_kwds, input_names)
"""
except:
# This only happens if the file does not exist
D = int(control_kwds["NDIM"])
Expand All @@ -442,6 +442,7 @@ def _post_process_ferre(input_nml_path, pwd=None, skip_pixel_arrays=False, **kwa
"log_chisq_fit": np.nan * np.ones(N),
}
names_with_missing_outputs = input_names
"""

# Create some boolean flags.
header_path = control_kwds["SYNTHFILE(1)"]
Expand Down
Loading

0 comments on commit 37e9f01

Please sign in to comment.