diff --git a/src/astra/pipelines/aspcap/__init__.py b/src/astra/pipelines/aspcap/__init__.py index 27ae460..ea1c442 100644 --- a/src/astra/pipelines/aspcap/__init__.py +++ b/src/astra/pipelines/aspcap/__init__.py @@ -4,54 +4,36 @@ import subprocess import re import heapq -from multiprocessing import Pipe +import thread +from multiprocessing import Pipe, Lock from datetime import datetime from tempfile import mkdtemp from typing import Optional, Iterable, List, Tuple, Callable, Union from peewee import JOIN, fn from tqdm import tqdm from time import time, sleep +from threading import Timer from astra import __version__, task -from astra.utils import log, expand_path +from astra.utils import log, expand_path, list_to_dict from astra.models.apogee import ApogeeCoaddedSpectrumInApStar from astra.models.aspcap import ASPCAP, FerreCoarse, FerreStellarParameters, FerreChemicalAbundances, Source from astra.models.spectrum import Spectrum, SpectrumMixin from astra.pipelines.ferre.pre_process import pre_process_ferre +from astra.pipelines.ferre.post_process import post_process_ferre +from astra.pipelines.ferre.utils import parse_header_path from astra.pipelines.aspcap.initial import get_initial_guesses -from astra.pipelines.aspcap.coarse import coarse_stellar_parameters, post_coarse_stellar_parameters, plan_coarse_stellar_parameters +from astra.pipelines.aspcap.coarse import coarse_stellar_parameters, post_coarse_stellar_parameters, plan_coarse_stellar_parameters, penalize_coarse_stellar_parameter_result +from astra.pipelines.aspcap.continuum import MedianFilter + #from astra.pipelines.aspcap.stellar_parameters import stellar_parameters, post_stellar_parameters #from astra.pipelines.aspcap.abundances import abundances, get_species, post_abundances #from astra.pipelines.aspcap.utils import ABUNDANCE_RELATIVE_TO_H -def yield_largest_finished_future(futures_dict): - """ - Yields the finished future with the largest value of the specified attribute. - """ - - futures = list(futures_dict.keys()) - - finished_futures = [] - while futures: - # Check for finished futures - for future in futures: - if future.done(): - try: - result = future.result() - # Calculate attribute value - # Add future and attribute value to heap - heapq.heappush(finished_futures, (-futures_dict[future], future)) - except Exception as e: - print(f"Exception in future: {e}") - - # Yield the future with the largest attribute value (if any) - if finished_futures: - _, largest_future = heapq.heappop(finished_futures) - yield largest_future - - # Remove finished futures from the main list - futures = [f for f in futures if not f.done()] +import signal +from threading import Event, Lock, Thread +from subprocess import Popen, PIPE, STDOUT @task @@ -62,9 +44,10 @@ def aspcap( weight_path: Optional[str] = "$MWM_ASTRA/pipelines/aspcap/masks/global.mask", element_weight_paths: str = "$MWM_ASTRA/pipelines/aspcap/masks/elements.list", parent_dir: Optional[str] = None, - max_workers: Optional[int] = 16, + max_processes: Optional[int] = 16, max_threads: Optional[int] = 128, - max_concurrent_loading: Optional[int] = 1, + max_concurrent_loading: Optional[int] = 4, + soft_thread_ratio: Optional[float] = 2, **kwargs ) -> Iterable[ASPCAP]: """ @@ -103,7 +86,6 @@ def aspcap( continuum_observations_flag: int = 1, """ - t_full = -time() if parent_dir is None: _dir = expand_path(f"$MWM_ASTRA/{__version__}/pipelines/aspcap/") @@ -120,36 +102,154 @@ def aspcap( for spectrum in spectra_with_no_initial_guess: yield ASPCAP.from_spectrum(spectrum, flag_no_suitable_initial_guess=True) - # If we load too many processes at once, the disk I/O kills us. + coarse_results = _aspcap_stage("coarse", plans, parent_dir, max_processes, max_threads, max_concurrent_loading, soft_thread_ratio) + + STAGE = "params" + # Plan stellar parameter stage. + coarse_results_by_spectrum = {} + for kwds in coarse_results: + this = FerreCoarse(**kwds) + # TODO: Make the penalized rchi2 a property of the FerreCoarse class. + this.penalized_rchi2 = penalize_coarse_stellar_parameter_result(this) + + best = None + try: + existing = coarse_results_by_spectrum[this.spectrum_pk] + except KeyError: + best = this + else: + if this.penalized_rchi2 < existing.penalized_rchi2: + best = this + elif this.penalized_rchi2 > existing.penalized_rchi2: + best = existing + elif this.penalized_rchi2 == existing.penalized_rchi2: + best = existing + best.flag_multiple_equally_good_coarse_results = True + finally: + coarse_results_by_spectrum[this.spectrum_pk] = best + + spectra_by_pk = {s.spectrum_pk: s for s in spectra} + + pre_continuum = MedianFilter() + + futures = [] + with concurrent.futures.ProcessPoolExecutor(128) as executor: + for r in tqdm(coarse_results_by_spectrum.values(), desc="Distributing work"): + spectrum = spectra_by_pk[r.spectrum_pk] + futures.append(executor.submit(_pre_compute_continuum, r, spectrum, pre_continuum)) + + pre_computed_continuum = {} + with tqdm(total=len(futures), desc="Pre-computing continuum") as pb: + for future in concurrent.futures.as_completed(futures): + spectrum_pk, continuum = future.result() + pre_computed_continuum[spectrum_pk] = continuum + pb.update() + + group_task_kwds = {} + for r in tqdm(coarse_results_by_spectrum.values(), desc="Grouping results"): + group_task_kwds.setdefault(r.header_path, []) + spectrum = spectra_by_pk[r.spectrum_pk] + + group_task_kwds[r.header_path].append( + dict( + spectra=spectrum, + pre_computed_continuum=pre_computed_continuum[r.spectrum_pk], + initial_teff=r.teff, + initial_logg=r.logg, + initial_m_h=r.m_h, + initial_log10_v_sini=r.log10_v_sini, + initial_log10_v_micro=r.log10_v_micro, + initial_alpha_m=r.alpha_m, + initial_c_m=r.c_m, + initial_n_m=r.n_m, + initial_flags=r.initial_flags, + upstream_pk=r.task_pk, + ) + ) - parent, child = Pipe() - n_executions, n_executions_total = (0, len(plans)) - current_threads, current_workers, currently_loading = (0, 0, 0) + stellar_parameter_plans = [] + for header_path in group_task_kwds.keys(): + short_grid_name = parse_header_path(header_path)["short_grid_name"] + group_task_kwds[header_path] = list_to_dict(group_task_kwds[header_path]) + group_task_kwds[header_path].update( + header_path=header_path, + weight_path=weight_path, + relative_dir=f"{STAGE}/{short_grid_name}", + **kwargs + ) + stellar_parameter_plans.append(group_task_kwds[header_path]) - at_capacity = lambda t, w, c: t >= max_threads or w >= max_workers or c >= max_concurrent_loading + stellar_parameter_results = _aspcap_stage("params", stellar_parameter_plans, parent_dir, max_processes, max_threads, max_concurrent_loading, soft_thread_ratio) + + # yeet back some ASPCAP results + + raise a + """ + for kwds in post_process_ferre(pwd): + result = FerreCoarse(**kwds) + penalize_coarse_stellar_parameter_result(result) + yield result + """ - with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as pre_and_post_process_executor: - pre_process_futures = { - pre_and_post_process_executor.submit( - pre_process_ferre, - pwd=os.path.join(parent_dir, plan.pop("relative_dir")), - **plan - ): len(plan["spectra"]) - for plan in plans - } - - results = [] - ferre_futures = [] - with tqdm(total=sum(len(plan["spectra"]) for plan in plans), desc="Running ASPCAP") as pb: +# TODO: remove from coarse.py + +def _pre_compute_continuum(coarse_result, spectrum, pre_continuum): + try: + # Apply continuum normalization. + pre_computed_continuum = pre_continuum.fit(spectrum, coarse_result) + except: + log.exception(f"Exception when computing continuum for spectrum {spectrum} from coarse result {coarse_result}:") + return (spectrum.spectrum_pk, None) + else: + return (spectrum.spectrum_pk, pre_computed_continuum) + + + + +def _aspcap_stage(stage, plans, parent_dir, max_processes, max_threads, max_concurrent_loading, soft_thread_ratio): + # FERRE can be limited by at least three mechanisms: + # 1. Too many threads requested (CPU limited). + # 2. Too many processes started (RAM limited). + # 3. Too many grids load at once (disk I/O limited). + parent, child = Pipe() + current_processes, current_threads, currently_loading = (0, 0, 0) + pre_processed_futures, ferre_futures, post_processed_futures = ([], [], []) + n_executions, n_executions_total, timings, t_full = (0, len(plans), {}, -time()) + + # TODO: the soft thread ratio is to account for the fact that it takes time to load the grid, and we can load the grid while + # we are still thread limited. By the time the grid is loaded, we won't be thread limited anymore. + # It's a massive hack that we should revisit. + at_capacity = lambda p, t, c: ( + p >= max_processes, + t >= (soft_thread_ratio * max_threads), + c >= max_concurrent_loading + ) + + max_workers = max(max_processes, max_threads) + with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: + + for plan in plans: + # Remove any output files files in the expected directory. + pwd = os.path.join(parent_dir, plan.pop("relative_dir")) + os.system(f"rm -f {pwd}/*.output {pwd}/stdout {pwd}/stderr") + ( + executor + .submit(pre_process_ferre, pwd=pwd, **plan) + .add_done_callback(lambda future: pre_processed_futures.insert(0, future)) + ) + + with tqdm(total=sum(len(plan["spectra"]) for plan in plans)) as pb: - def check_capacity(current_threads, current_workers, currently_loading): + def check_capacity(current_processes, current_threads, currently_loading): + worker_limit, thread_limit, loading_limit = at_capacity(current_processes, current_threads, currently_loading) pb.set_description( - f"ASPCAP (" - f"{current_threads}/{max_threads}; " - f"{current_workers}/{max_workers}; " - f"{'loading' if currently_loading > 0 else ''} {n_executions}/{n_executions_total})" + f"ASPCAP {stage} (" + f"thread {current_threads}/{max_threads}{'*' if thread_limit else ''}; " + f"proc {current_processes}/{max_processes}{'*' if worker_limit else ''}; " + f"load {currently_loading}/{max_concurrent_loading}{'*' if loading_limit else ''}; " + f"job {n_executions}/{n_executions_total})" ) while parent.poll(): @@ -160,21 +260,32 @@ def check_capacity(current_threads, current_workers, currently_loading): pb.update(n) try: - ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=1)) + ferre_future = next(concurrent.futures.as_completed(ferre_futures, timeout=0)) except TimeoutError: None else: (completed_directory, return_code, t_overhead, t_elapsed) = ferre_future.result() - #pre_and_post_process_executor.submit(post_process_ferre, directory=completed_directory) + if return_code not in (0, 1): + log.exception(f"FERRE failed on {completed_directory}: {return_code}") + timings[completed_directory] = (t_overhead, t_elapsed) + post_processed_futures.append( + executor.submit( + post_process_ferre, + completed_directory + ) + ) ferre_futures.remove(ferre_future) - current_workers -= 1 - - return (current_threads, current_workers, currently_loading) - - with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ferre_executor: - - for future in yield_largest_finished_future(pre_process_futures): + current_processes -= 1 + return (current_processes, current_threads, currently_loading) + while n_executions_total > n_executions: + try: + # Let's oscillate between the first (largest) and last (smallest) elements: (-0 and -1) + # This means we are distributing the grid loading time while other threads are doing useful things. + future = pre_processed_futures.pop(-((n_executions + 1) % 2)) + except IndexError: + continue + else: directory, n_obj, skipped = future.result() # Spectra might be skipped because the file could not be found, or if there were too many bad pixels. @@ -182,32 +293,89 @@ def check_capacity(current_threads, current_workers, currently_loading): pb.update(1) #yield ASPCAP.from_spectrum(spectrum, **kwds) - while at_capacity(current_threads, current_workers, currently_loading): - current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading) - - ferre_futures.append(ferre_executor.submit(ferre, directory, n_obj, child)) - n_executions, currently_loading, current_threads, current_workers = (n_executions + 1, currently_loading + 1, current_threads + n_obj, current_workers + 1) + while any(at_capacity(current_processes, current_threads, currently_loading)): + current_processes, current_threads, currently_loading = check_capacity(current_processes, current_threads, currently_loading) + ferre_futures.append(executor.submit(ferre, directory, n_obj, child)) + n_executions, currently_loading, current_threads, current_processes = (n_executions + 1, currently_loading + 1, current_threads + n_obj, current_processes + 1) - while current_workers: - current_threads, current_workers, currently_loading = check_capacity(current_threads, current_workers, currently_loading) + # All submitted. Now wait for them to finish. + while current_processes: + current_processes, current_threads, currently_loading = check_capacity(current_processes, current_threads, currently_loading) - t_full += time() - print(f"Full time: {t_full:.0f} s") - raise a + t_full += time() + print(f"FUll time: {t_full}") + + parent.close() + child.close() + print("closed parent/child") + results = [] + for future in concurrent.futures.as_completed(post_processed_futures): + for result in future.result(): + # Assign timings to the results. + try: + t_overhead, t_elapsed_all = timings[result["pwd"]] + t_elapsed = t_elapsed_all[result["ferre_name"]] + except: + t_elapsed = t_overhead = np.nan + finally: + result["t_overhead"] = t_overhead + result["t_elapsed"] = t_elapsed + results.append(result) + + print("closing ferre executor") + executor.shutdown(wait=False, cancel_futures=True) + print("closed ferre executor") + + return results + + regex_next_object = re.compile(r"next object #\s+(\d+)") regex_completed = re.compile(r"\s+(\d+)\s(\d+_[\d\w_]+)") # assumes input ids start with an integer and underscore -def ferre(directory, n_obj, child): - try: +def ferre(directory, n_obj, pipe, timeout_line=10, timeout_grid_load=60): + rid = os.getpid() + #print(f"FERRE starting {rid} {directory}") + try: stdout, n_complete = ([], 0) t_start, t_overhead, t_elapsed = (time(), None, {}) process = subprocess.Popen(["ferre.x"], cwd=directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + + def callback(*args): + print(f"callback hit: {args}") + raise KeyboardInterrupt + + #watchdog = WatchdogTimer(timeout_line, callback=callback, daemon=True) + #watchdog.start() + + for line in iter(process.stdout.readline, ""): + #except KeyboardInterrupt: + # # watchdog hit. + # print(f"{rid} {directory} watchdog. time_elapsed: {time() - t_start:.0f} s, n_complete: {n_complete}/{n_obj} (t_overhead: {t_overhead})") + #else: + #with watchdog.blocked: + # if an object is completed, note the completion time and send back some information to the parent. + if match := regex_next_object.search(line): + t_elapsed[int(match.group(1))] = -time() + if t_overhead is None: + t_overhead = time() - t_start + pipe.send((directory, 0)) + + if match := regex_completed.search(line): + t_elapsed[match.group(2)] = t_elapsed.pop(int(match.group(1))) + time() + n_complete += 1 + pipe.send((directory, 1)) + stdout.append(line) + #finally: + # watchdog.restart() + + """ for line in iter(process.stdout.readline, ""): + # if an object is completed, note the completion time and send back some information to the parent. if match := regex_next_object.search(line): t_elapsed[int(match.group(1))] = -time() @@ -221,15 +389,46 @@ def ferre(directory, n_obj, child): child.send((directory, 1)) stdout.append(line) - # In case there were some we did not pick up the timings for + print(f"{rid} still waiting in {directory} after {timeout_line}") + # Check which objects we are waiting on, and how long we have been waiting. + t_since_start = time() - t_start + is_grid_loaded = t_overhead is not None + if (t_overhead is None) and (time() - t_start) > timeout_grid_load: + print(f"{rid} timeout on grid load: {t_since_start:.0f} s so far") + + currently_running = {k: (time() + v) for k, v in t_elapsed.items() if v < 0} + if len(currently_running) > 0: + print(f"{rid} currently running {len(currently_running)} objects:") + for k, v in currently_running.items(): + print(f" {rid} {k}: {v:.0f} s") + """ + + + #finally: + # timeout.cancel() + + # In case there were some we did not pick up the timings for, or we somehow sent false-positives. if n_complete != n_obj: child.send((directory, n_obj - n_complete)) with open(os.path.join(directory, "stdout"), "w") as fp: - fp.write("\n".join(stdout)) + fp.write("".join(stdout)) process.stdout.close() - return_code = process.wait() + return_code = int(process.wait()) + stderr = process.stderr.read() + process.stderr.close() + try: + process.kill() + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + except: + None + + if return_code not in (0, 1): # FERRE returns return code 1 even if everything is OK. + log.error(f"FERRE returned code {return_code} in {directory}:") + log.error(stderr) + with open(os.path.join(directory, "stderr"), "w") as fp: + fp.write(stderr) except: print(f"FAILED ON {directory}") raise diff --git a/src/astra/pipelines/aspcap/coarse.py b/src/astra/pipelines/aspcap/coarse.py index c6d8558..637d427 100644 --- a/src/astra/pipelines/aspcap/coarse.py +++ b/src/astra/pipelines/aspcap/coarse.py @@ -164,30 +164,30 @@ def penalize_coarse_stellar_parameter_result(result: FerreCoarse, warn_multiplie """ # Penalize GK-esque things at cool temperatures. - result.penalized_rchi2 = 0 + result.rchi2 + penalized_rchi2 = 0 + result.rchi2 if result.teff < 3900 and "GK_200921" in result.header_path: - result.penalized_rchi2 *= cool_star_in_gk_grid_multiplier + penalized_rchi2 *= cool_star_in_gk_grid_multiplier if result.flag_logg_grid_edge_warn: - result.penalized_rchi2 *= warn_multiplier + penalized_rchi2 *= warn_multiplier if result.flag_teff_grid_edge_warn: - result.penalized_rchi2 *= warn_multiplier + penalized_rchi2 *= warn_multiplier if result.flag_logg_grid_edge_bad: - result.penalized_rchi2 *= bad_multiplier + penalized_rchi2 *= bad_multiplier if result.flag_teff_grid_edge_bad: - result.penalized_rchi2 *= bad_multiplier + penalized_rchi2 *= bad_multiplier # Add penalization terms for if FERRE failed. if result.flag_teff_ferre_fail: - result.penalized_rchi2 *= fail_multiplier + penalized_rchi2 *= fail_multiplier if result.flag_logg_ferre_fail: - result.penalized_rchi2 *= fail_multiplier + penalized_rchi2 *= fail_multiplier - return None + return penalized_rchi2 def plan_coarse_stellar_parameters( diff --git a/src/astra/pipelines/ferre/post_process.py b/src/astra/pipelines/ferre/post_process.py index 1e11926..e31918c 100644 --- a/src/astra/pipelines/ferre/post_process.py +++ b/src/astra/pipelines/ferre/post_process.py @@ -1,8 +1,9 @@ import os import numpy as np from typing import Iterable - +from time import time from astra.utils import log, expand_path +from astra.pipelines.ferre.operator import post_execution_interpolation from astra.pipelines.ferre.utils import ( read_ferre_headers, read_control_file, @@ -25,7 +26,12 @@ def write_pixel_array_with_names(path, names, data): LARGE = 1e10 # TODO: This is also defined in pre_process, move it common -def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iterable[dict]: +def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> list[dict]: + post_execution_interpolation(dir) + v = list(_post_process_ferre(dir, pwd=pwd, skip_pixel_arrays=skip_pixel_arrays, **kwargs)) + return v + +def _post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iterable[dict]: """ Post-process results from a FERRE execution. @@ -49,6 +55,7 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter # When finding paths, if the path is in the input.nml file, we should use `ref_dir`, otherwise `dir`. timing = {} + """ try: raw_timing = np.atleast_2d(np.loadtxt(os.path.join(ref_dir, "timing.csv"), dtype=str, delimiter=",")) except: @@ -66,7 +73,7 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter timing = timing["input.nml"] except: log.exception(f"Exception when trying to load timing for {ref_dir}") - + """ control_kwds = read_control_file(os.path.join(dir, "input.nml")) # Load input files. @@ -106,9 +113,8 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter is_missing_rectified_model_flux = ~np.all(np.isfinite(rectified_model_flux), axis=1) ''' - print("WARNING: Using new verify_and_fix_output_flux_file") parameter_input_path = os.path.join(dir, "parameter.input") - os.system(f"verify_and_fix_ferre_output_flux_file {parameter_input_path} {offile_path}") + os.system(f"vaffoff {parameter_input_path} {offile_path}") is_missing_rectified_model_flux = ~np.isfinite(np.atleast_1d(np.loadtxt(offile_path, usecols=(1, ), dtype=float))) names_with_missing_rectified_model_flux = input_names[is_missing_rectified_model_flux] @@ -131,7 +137,7 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter rectified_flux = np.nan * np.ones_like(flux) ''' - os.system(f"verify_and_fix_ferre_output_flux_file {parameter_input_path} {sffile_path}") + os.system(f"vaffoff {parameter_input_path} {sffile_path}") names_with_missing_rectified_flux = input_names[~np.isfinite(np.loadtxt(sffile_path, usecols=(1, ), dtype=float))] ''' @@ -148,7 +154,14 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter model_flux = np.nan * np.ones_like(flux) ''' model_flux_output_path = os.path.join(absolute_dir, "model_flux.output") # TODO: Should this be ref_dir? - os.system(f"verify_and_fix_ferre_output_flux_file {parameter_input_path} {model_flux_output_path}") + # We might have to wait some time for it to be written + #t_wait = time() + #while not os.path.exists(model_flux_output_path): + # if time() > t_wait + 60: + # log.warn(f"Cannot find model_flux output in {absolute_dir} ({model_flux_output_path})") + # break + + os.system(f"vaffoff {parameter_input_path} {model_flux_output_path}") is_missing_model_flux = ~np.isfinite(np.atleast_1d(np.loadtxt(model_flux_output_path, usecols=(1, ), dtype=float))) if len(names_with_missing_rectified_model_flux) > 0: @@ -232,17 +245,7 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter # Add correlation coefficients. #meta["cov"] # Add timing information, if we can. - try: - t_load, t_elapsed = timing[name] - result.update( - ferre_time_load_grid=t_load, - ferre_time_elapsed=t_elapsed, - ) - except: - if len(timing) > 0: - # Only warn when there are specific timings missing - log.warning(f"No FERRE timing for spectrum_pk={name_meta['spectrum_pk']}") - + ''' if not skip_pixel_arrays: snr = np.nanmedian(flux[i]/e_flux[i]) @@ -278,7 +281,5 @@ def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iter f"flag_{parameter}_grid_edge_warn": flag_grid_edge_warn[i, j], }) - # TODO: Load metadata from dir/meta.json (e.g., pre-continuum steps) - # TODO: Include correlation coefficients? yield result \ No newline at end of file diff --git a/src/astra/pipelines/ferre/pre_process.py b/src/astra/pipelines/ferre/pre_process.py index adf28a9..ddeda44 100644 --- a/src/astra/pipelines/ferre/pre_process.py +++ b/src/astra/pipelines/ferre/pre_process.py @@ -99,7 +99,7 @@ def pre_process_ferre( control_kwds[key] = prefix + control_kwds[key] absolute_pwd = expand_path(pwd) - log.info(f"FERRE working directory: {absolute_pwd}") + #log.info(f"FERRE working directory: {absolute_pwd}") # Construct mask to match FERRE model grid. @@ -141,8 +141,9 @@ def pre_process_ferre( # If this part fails, the spectrum doesn't exist and we should just continue try: - flux = np.copy(spectrum.flux) - e_flux = np.copy(spectrum.ivar)**-0.5 + with np.errstate(divide="ignore"): + flux = np.copy(spectrum.flux) + e_flux = np.copy(spectrum.ivar)**-0.5 except: log.warning(f"Exception accessing pixel arrays for spectrum {spectrum}") skipped.append((spectrum, {"flag_spectrum_io_error": True})) diff --git a/src/astra/specutils/__init__.py b/src/astra/specutils/__init__.py index 9400ab0..e69de29 100644 --- a/src/astra/specutils/__init__.py +++ b/src/astra/specutils/__init__.py @@ -1 +0,0 @@ -from .frizzle import frizzle \ No newline at end of file