Skip to content

Commit

Permalink
abundances stage omg ferre -l
Browse files Browse the repository at this point in the history
  • Loading branch information
andycasey committed Jan 10, 2025
1 parent 72d1f22 commit d22e3f6
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 250 deletions.
438 changes: 205 additions & 233 deletions src/astra/pipelines/aspcap/__init__.py

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions src/astra/pipelines/aspcap/abundances.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

STAGE = "abundances"

# NOTE to Future Andy:
# ferre often (but not always; ie nthreads>1) needs the NOBJ keyword to be set for doing abundances in list (-l) mode


@task
def abundances(
Expand Down Expand Up @@ -144,9 +147,11 @@ def post_abundances(parent_dir, relative_mode=True, skip_pixel_arrays=True, **kw
for kwds in post_process_ferre(dir, ref_dir, skip_pixel_arrays=skip_pixel_arrays, **kwargs):
yield FerreChemicalAbundances(**kwds)



def plan_abundances_stage(
spectra: Iterable[Spectrum],
parent_dir: str,
stellar_parameter_results,
element_weight_paths: str,
continuum_order: Optional[int] = -1,
Expand Down Expand Up @@ -195,7 +200,7 @@ def plan_abundances_stage(

spectrum = lookup_spectrum_by_primary_key[result["spectrum_pk"]]

prefix = f"{result['pwd']}/params/{result['short_grid_name']}"
prefix = f"{result['pwd']}"#/params/{result['short_grid_name']}"

try:
continuum_cache[prefix]
Expand Down Expand Up @@ -280,7 +285,7 @@ def plan_abundances_stage(
weight_path, frozen_parameters, ferre_kwds = details
kwds = grid_kwds.copy()
kwds.update(
relative_dir=os.path.join(STAGE, short_grid_name, species),
pwd=os.path.join(parent_dir, STAGE, short_grid_name, species),
header_path=header_path,
weight_path=weight_path,
frozen_parameters=frozen_parameters,
Expand All @@ -297,11 +302,21 @@ def plan_abundances_stage(
continue
plans.append(kwds)


# Group together as necessary
grouped = {}
for plan in plans:
short_grid_name = parse_header_path(plan["header_path"])["short_grid_name"]

grouped.setdefault(short_grid_name, [])
grouped[short_grid_name].append(plan)

#spectra_with_no_stellar_parameters -= set(grid_kwds["spectra"])


grouped_plans = list(grouped.values())
#spectra_with_no_stellar_parameters = tuple(spectra_with_no_stellar_parameters)
#return (plans, spectra_with_no_stellar_parameters)
return plans
return grouped_plans



Expand Down
7 changes: 4 additions & 3 deletions src/astra/pipelines/aspcap/coarse.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def penalize_coarse_stellar_parameter_result(result: FerreCoarse, warn_multiplie

def plan_coarse_stellar_parameters_stage(
spectra: Iterable[Spectrum],
parent_dir: str,
header_paths: Optional[Union[List[str], Tuple[str], str]] = "$MWM_ASTRA/pipelines/aspcap/synspec_dr17_marcs_header_paths.list",
initial_guess_callable: Optional[Callable] = None,
weight_path: Optional[str] = "$MWM_ASTRA/pipelines/aspcap/masks/global.mask",
Expand Down Expand Up @@ -244,17 +245,17 @@ def plan_coarse_stellar_parameters_stage(

short_grid_name = parse_header_path(header_path)["short_grid_name"]

relative_dir = f"{STAGE}/{short_grid_name}"
pwd = f"{parent_dir}/{STAGE}/{short_grid_name}"

grouped_task_kwds[header_path].update(
header_path=header_path,
relative_dir=relative_dir,
pwd=pwd,
weight_path=weight_path,
# Frozen parameters are common to the header path, so just set as the first value.
frozen_parameters=grouped_task_kwds[header_path]["frozen_parameters"][0],
)
grouped_task_kwds[header_path].update(kwargs)
return_list_of_kwds.append(grouped_task_kwds[header_path])
return_list_of_kwds.append([grouped_task_kwds[header_path]])

return (return_list_of_kwds, spectra_with_no_initial_guess)

Expand Down
6 changes: 3 additions & 3 deletions src/astra/pipelines/aspcap/stellar_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _pre_compute_continuum(coarse_result, spectrum, pre_continuum):
return (spectrum.spectrum_pk, pre_computed_continuum)


def plan_stellar_parameters_stage(spectra, coarse_results, weight_path, pre_continuum=MedianFilter, **kwargs):
def plan_stellar_parameters_stage(spectra, parent_dir, coarse_results, weight_path, pre_continuum=MedianFilter, **kwargs):

best_coarse_results = {}
for kwds in coarse_results:
Expand Down Expand Up @@ -109,10 +109,10 @@ def plan_stellar_parameters_stage(spectra, coarse_results, weight_path, pre_cont
group_task_kwds[header_path].update(
header_path=header_path,
weight_path=weight_path,
relative_dir=f"{STAGE}/{short_grid_name}",
pwd=f"{parent_dir}/{STAGE}/{short_grid_name}",
**kwargs
)
stellar_parameter_plans.append(group_task_kwds[header_path])
stellar_parameter_plans.append([group_task_kwds[header_path]])

return (stellar_parameter_plans, best_coarse_results)

Expand Down
30 changes: 27 additions & 3 deletions src/astra/pipelines/ferre/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,33 @@ def write_pixel_array_with_names(path, names, data):

LARGE = 1e10 # TODO: This is also defined in pre_process, move it common

def post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> list[dict]:
post_execution_interpolation(dir)
v = list(_post_process_ferre(dir, pwd=pwd, skip_pixel_arrays=skip_pixel_arrays, **kwargs))
def post_process_ferre(input_nml_path, **kwargs) -> list[dict]:

"""
if relative_mode:
ref_dir = os.path.dirname(dir)
else:
ref_dir = None
log.info(f"Post-processing FERRE results in {dir} {'with FERRE list mode' if relative_mode else 'in standard mode'}")
for kwds in post_process_ferre(dir, ref_dir, skip_pixel_arrays=skip_pixel_arrays, **kwargs):
yield FerreChemicalAbundances(**kwds)
"""
is_abundance_mode = input_nml_path.endswith("input_list.nml")
if is_abundance_mode:
abundance_dir = os.path.dirname(input_nml_path)
with open(input_nml_path, "r") as fp:
dirs = [os.path.join(abundance_dir, line.split("/")[0]) for line in fp.read().strip().split("\n")]

# TODO: this might be slow we can probably use -l mode
for d in dirs:
post_execution_interpolation(d)

ref_dir = os.path.dirname(input_nml_path)
v = [list(_post_process_ferre(d, ref_dir, skip_pixel_arrays=True, **kwargs)) for d in dirs]
else:
directory = os.path.dirname(input_nml_path)
post_execution_interpolation(directory)
v = list(_post_process_ferre(directory, **kwargs))
return v

def _post_process_ferre(dir, pwd=None, skip_pixel_arrays=False, **kwargs) -> Iterable[dict]:
Expand Down
51 changes: 50 additions & 1 deletion src/astra/pipelines/ferre/pre_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,51 @@

# FERRE v4.8.8 src trunk : /uufs/chpc.utah.edu/common/home/sdss09/software/apogee/Linux/apogee/trunk/external/ferre/src

def pre_process_ferres(plans):
processed = [pre_process_ferre(**plan) for plan in plans]
if len(plans) == 1:
return processed[0]
else:
# Abundance mode.
abundance_dir = os.path.dirname(os.path.dirname(processed[0][0]))

input_nml_paths, total, skipped = ([], 0, [])
for input_nml_path_, n_obj_, n_threads, skipped_ in processed:
input_nml_paths.append(input_nml_path_[len(abundance_dir) + 1:]) # ppaths too long
total += n_obj_
for spectrum, kwds in skipped_:
skipped.append((spectrum, kwds))


# Create a FERRE list file.
input_nml_path = os.path.join(abundance_dir, "input_list.nml")
with open(input_nml_path, "w") as fp:
fp.write("\n".join(input_nml_paths) + "\n")

return (input_nml_path, total, n_threads, skipped)

""""
def group_abundance_plans_in_list_mode(plans):
group = {}
for kwd in plans:
pre_process_ferre(**kwd)
pwd = kwd["pwd"].rstrip("/")
group_dir = "/".join(pwd.split("/")[:-1])
group.setdefault(group_dir, [])
group[group_dir].append(pwd[1 + len(group_dir):] + "/input.nml")
if ferre_list_mode:
# Create a parent input_list.nml file to use with the ferre.x -l flag.
for pwd, items in group.items():
input_list_path = f"{pwd}/input_list.nml"
log.info(f"Created grouped FERRE input file with {len(items)} dirs: {input_list_path}")
with open(expand_path(input_list_path), "w") as fp:
# Sometimes `wc` would not give the right amount of lines in a file, so we add a \n to the end
# https://unix.stackexchange.com/questions/314256/wc-l-not-returning-correct-value
fp.write("\n".join(items) + "\n")
"""

def pre_process_ferre(
pwd: str,
header_path: str,
Expand Down Expand Up @@ -55,8 +100,12 @@ def pre_process_ferre(
reference_pixel_arrays_for_abundance_run=False,
write_input_pixel_arrays=True,
max_num_bad_pixels=2000,
remove_existing_output_files=True,
**kwargs
):

if remove_existing_output_files:
os.system(f"rm -f {pwd}/*.output {pwd}/stdout {pwd}/stderr")

if kwargs:
log.warning(f"astra.pipelines.ferre.pre_process.pre_process ignoring kwargs: {kwargs}")
Expand Down Expand Up @@ -240,7 +289,7 @@ def pre_process_ferre(
np.savetxt(e_flux_path, batch_e_flux, **savetxt_kwds)

n_obj = len(batch_names)
return (pwd, n_obj, min(n_threads, n_obj), skipped)
return (f"{pwd}/input.nml", n_obj, min(n_threads, n_obj), skipped)

'''
bad_pixel_flux_value: float = 1e-4,
Expand Down
6 changes: 3 additions & 3 deletions src/astra/pipelines/ferre/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def validate_ferre_control_keywords(
return (kwds, headers, segment_headers, frozen_parameters)


def format_ferre_control_keywords(ferre_kwds: dict, n_obj=None) -> str:
def format_ferre_control_keywords(ferre_kwds: dict, n_obj) -> str:
r"""
Format control keywords for FERRE to digest.
Expand Down Expand Up @@ -564,8 +564,8 @@ def format_ferre_control_keywords(ferre_kwds: dict, n_obj=None) -> str:
)

contents = " &LISTA\n"
#if n_obj is not None:
# contents += f" NOBJ = {n_obj}\n"
if n_obj is not None:
contents += f" NOBJ = {n_obj}\n"
remaining_keys = set(map(str.lower, ferre_kwds)).difference(preferred_order)
keys = list(preferred_order)
for k in remaining_keys:
Expand Down

0 comments on commit d22e3f6

Please sign in to comment.