Skip to content

Commit

Permalink
Consolidate python configuration dictionaries (#2839)
Browse files Browse the repository at this point in the history
This modifies the way the `config` dictionary is constructed and
referenced. Rather than updating a single configuration dictionary with
each `RUN`, a `RUN`-based dictionary of `config` dictionaries is created
and referenced by the appropriate `RUN` when calculating resources.

This also makes the methods that were hidden before #2727 hidden again.

Resolves #2783
  • Loading branch information
DavidHuber-NOAA authored Sep 10, 2024
1 parent 0953c0f commit b443915
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 88 deletions.
2 changes: 1 addition & 1 deletion parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ case ${step} in
;;

"verfozn")
walltime="00:05:00"
walltime="00:10:00"
ntasks=1
threads_per_task=1
tasks_per_node=1
Expand Down
115 changes: 57 additions & 58 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Dict, List, Any
from datetime import timedelta
from hosts import Host
from pathlib import Path
from wxflow import Configuration, to_timedelta
from abc import ABC, ABCMeta, abstractmethod

Expand Down Expand Up @@ -32,57 +31,50 @@ def __init__(self, conf: Configuration) -> None:

self.scheduler = Host().scheduler

# Save the configuration so we can source the config files when
# determining task resources
self.conf = conf
base = conf.parse_config('config.base')

_base = self.conf.parse_config('config.base')
# Define here so the child __init__ functions can use it; will
# be overwritten later during _init_finalize().
self._base = _base

self.mode = _base['MODE']
self.mode = base['MODE']

if self.mode not in self.VALID_MODES:
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n' +
'Valid application modes are:\n' +
f'{", ".join(self.VALID_MODES)}')

self.net = _base['NET']
self.model_app = _base.get('APP', 'ATM')
self.do_atm = _base.get('DO_ATM', True)
self.do_wave = _base.get('DO_WAVE', False)
self.do_wave_bnd = _base.get('DOBNDPNT_WAVE', False)
self.do_ocean = _base.get('DO_OCN', False)
self.do_ice = _base.get('DO_ICE', False)
self.do_aero = _base.get('DO_AERO', False)
self.do_prep_obs_aero = _base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = _base.get('DO_BUFRSND', False)
self.do_gempak = _base.get('DO_GEMPAK', False)
self.do_awips = _base.get('DO_AWIPS', False)
self.do_verfozn = _base.get('DO_VERFOZN', True)
self.do_verfrad = _base.get('DO_VERFRAD', True)
self.do_vminmon = _base.get('DO_VMINMON', True)
self.do_tracker = _base.get('DO_TRACKER', True)
self.do_genesis = _base.get('DO_GENESIS', True)
self.do_genesis_fsu = _base.get('DO_GENESIS_FSU', False)
self.do_metp = _base.get('DO_METP', False)
self.do_upp = not _base.get('WRITE_DOPOST', True)
self.do_goes = _base.get('DO_GOES', False)
self.do_mos = _base.get('DO_MOS', False)
self.do_extractvars = _base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = _base.get('HPSSARCH', False)

self.nens = _base.get('NMEM_ENS', 0)
self.fcst_segments = _base.get('FCST_SEGMENTS', None)
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n'
f'Valid application modes are:\n'
f'{", ".join(self.VALID_MODES)}\n')

self.net = base['NET']
self.model_app = base.get('APP', 'ATM')
self.do_atm = base.get('DO_ATM', True)
self.do_wave = base.get('DO_WAVE', False)
self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False)
self.do_ocean = base.get('DO_OCN', False)
self.do_ice = base.get('DO_ICE', False)
self.do_aero = base.get('DO_AERO', False)
self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = base.get('DO_BUFRSND', False)
self.do_gempak = base.get('DO_GEMPAK', False)
self.do_awips = base.get('DO_AWIPS', False)
self.do_verfozn = base.get('DO_VERFOZN', True)
self.do_verfrad = base.get('DO_VERFRAD', True)
self.do_vminmon = base.get('DO_VMINMON', True)
self.do_tracker = base.get('DO_TRACKER', True)
self.do_genesis = base.get('DO_GENESIS', True)
self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False)
self.do_metp = base.get('DO_METP', False)
self.do_upp = not base.get('WRITE_DOPOST', True)
self.do_goes = base.get('DO_GOES', False)
self.do_mos = base.get('DO_MOS', False)
self.do_extractvars = base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = base.get('HPSSARCH', False)

self.nens = base.get('NMEM_ENS', 0)
self.fcst_segments = base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(self.fcst_segments):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

self.wave_runs = None
if self.do_wave:
wave_run = _base.get('WAVE_RUN', 'BOTH').lower()
wave_run = base.get('WAVE_RUN', 'BOTH').lower()
if wave_run in ['both']:
self.wave_runs = ['gfs', 'gdas']
elif wave_run in ['gfs', 'gdas']:
Expand All @@ -91,45 +83,52 @@ def __init__(self, conf: Configuration) -> None:
self.aero_anl_runs = None
self.aero_fcst_runs = None
if self.do_aero:
aero_anl_run = _base.get('AERO_ANL_RUN', 'BOTH').lower()
aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower()
if aero_anl_run in ['both']:
self.aero_anl_runs = ['gfs', 'gdas']
elif aero_anl_run in ['gfs', 'gdas']:
self.aero_anl_runs = [aero_anl_run]
aero_fcst_run = _base.get('AERO_FCST_RUN', None).lower()
aero_fcst_run = base.get('AERO_FCST_RUN', None).lower()
if aero_fcst_run in ['both']:
self.aero_fcst_runs = ['gfs', 'gdas']
elif aero_fcst_run in ['gfs', 'gdas']:
self.aero_fcst_runs = [aero_fcst_run]

def _init_finalize(self, *args):
def _init_finalize(self, conf: Configuration):
print("Finalizing initialize")

# Get a list of all possible config_files that would be part of the application
self.configs_names = self._get_app_configs()

# Source the config_files for the jobs in the application
self.configs = self.source_configs()
# Source the config files for the jobs in the application without specifying a RUN
self.configs = {'_no_run': self._source_configs(conf)}

# Update the base config dictionary base on application
self.configs['base'] = self.update_base(self.configs['base'])
# Update the base config dictionary based on application
self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base'])

# Save base in the internal state since it is often needed
self._base = self.configs['base']
base = self.configs['_no_run']['base']

# Get more configuration options into the class attributes
self.gfs_cyc = self._base.get('gfs_cyc')
self.gfs_cyc = base.get('gfs_cyc')

# Finally get task names for the application
# Get task names for the application
self.task_names = self.get_task_names()

# Finally, source the configuration files for each valid `RUN`
for run in self.task_names.keys():
self.configs[run] = self._source_configs(conf, run=run, log=False)

# Update the base config dictionary based on application and RUN
self.configs[run]['base'] = self._update_base(self.configs[run]['base'])

@abstractmethod
def _get_app_configs(self):
pass

@staticmethod
@abstractmethod
def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
def _update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
'''
Make final updates to base and return an updated copy
Expand All @@ -146,7 +145,7 @@ def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
'''
pass

def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
"""
Given the configuration object used to initialize this application,
source the configurations for each config and return a dictionary
Expand All @@ -156,7 +155,7 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
configs = dict()

# Return config.base as well
configs['base'] = self.conf.parse_config('config.base')
configs['base'] = conf.parse_config('config.base', RUN=run)

# Source the list of all config_files involved in the application
for config in self.configs_names:
Expand All @@ -180,12 +179,12 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
files += [f'config.{config}']

print(f'sourcing config.{config}') if log else 0
configs[config] = self.conf.parse_config(files, RUN=run)
configs[config] = conf.parse_config(files, RUN=run)

return configs

@abstractmethod
def get_task_names(self) -> Dict[str, List[str]]:
def get_task_names(self, run="_no_run") -> Dict[str, List[str]]:
'''
Create a list of task names for each RUN valid for the configuation.
Expand Down
7 changes: 5 additions & 2 deletions workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class GEFSAppConfig(AppConfig):
def __init__(self, conf: Configuration):
super().__init__(conf)

base = conf.parse_config('config.base')
self.run = base.get('RUN', 'gefs')

def _get_app_configs(self):
"""
Returns the config_files that are involved in gefs
Expand All @@ -36,7 +39,7 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

base_out = base_in.copy()
base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc'])
Expand Down Expand Up @@ -81,4 +84,4 @@ def get_task_names(self):

tasks += ['arch']

return {f"{self._base['RUN']}": tasks}
return {f"{self.run}": tasks}
23 changes: 12 additions & 11 deletions workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ class GFSCycledAppConfig(AppConfig):

def __init__(self, conf: Configuration):
super().__init__(conf)
self.do_hybvar = self._base.get('DOHYBVAR', False)
self.do_fit2obs = self._base.get('DO_FIT2OBS', True)
self.do_jediatmvar = self._base.get('DO_JEDIATMVAR', False)
self.do_jediatmens = self._base.get('DO_JEDIATMENS', False)
self.do_jediocnvar = self._base.get('DO_JEDIOCNVAR', False)
self.do_jedisnowda = self._base.get('DO_JEDISNOWDA', False)
self.do_mergensst = self._base.get('DO_MERGENSST', False)
self.do_vrfy_oceanda = self._base.get('DO_VRFY_OCEANDA', False)
base = conf.parse_config('config.base')
self.do_hybvar = base.get('DOHYBVAR', False)
self.do_fit2obs = base.get('DO_FIT2OBS', True)
self.do_jediatmvar = base.get('DO_JEDIATMVAR', False)
self.do_jediatmens = base.get('DO_JEDIATMENS', False)
self.do_jediocnvar = base.get('DO_JEDIOCNVAR', False)
self.do_jedisnowda = base.get('DO_JEDISNOWDA', False)
self.do_mergensst = base.get('DO_MERGENSST', False)
self.do_vrfy_oceanda = base.get('DO_VRFY_OCEANDA', False)

self.lobsdiag_forenkf = False
self.eupd_runs = None
if self.do_hybvar:
self.lobsdiag_forenkf = self._base.get('lobsdiag_forenkf', False)
eupd_run = self._base.get('EUPD_CYC', 'gdas').lower()
self.lobsdiag_forenkf = base.get('lobsdiag_forenkf', False)
eupd_run = base.get('EUPD_CYC', 'gdas').lower()
if eupd_run in ['both']:
self.eupd_runs = ['gfs', 'gdas']
elif eupd_run in ['gfs', 'gdas']:
Expand Down Expand Up @@ -125,7 +126,7 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

return GFSCycledAppConfig.get_gfs_cyc_dates(base_in)

Expand Down
18 changes: 11 additions & 7 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ class GFSForecastOnlyAppConfig(AppConfig):
def __init__(self, conf: Configuration):
super().__init__(conf)

base = conf.parse_config('config.base')
self.aero_fcst_run = base.get('AERO_FCST_RUN', 'BOTH').lower()
self.run = base.get('RUN', 'gfs')
self.exp_warm_start = base.get('EXP_WARM_START', False)

def _get_app_configs(self):
"""
Returns the config_files that are involved in the forecast-only app
Expand All @@ -25,7 +30,7 @@ def _get_app_configs(self):
configs += ['atmos_products']

if self.do_aero:
if not self._base['EXP_WARM_START']:
if not self.exp_warm_start:
configs += ['aerosol_init']

if self.do_tracker:
Expand Down Expand Up @@ -70,11 +75,10 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

base_out = base_in.copy()
base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc'])
base_out['RUN'] = 'gfs'

return base_out

Expand All @@ -88,9 +92,9 @@ def get_task_names(self):
tasks = ['stage_ic']

if self.do_aero:
aero_fcst_run = self._base.get('AERO_FCST_RUN', 'BOTH').lower()
if self._base['RUN'] in aero_fcst_run or aero_fcst_run == "both":
if not self._base['EXP_WARM_START']:
aero_fcst_run = self.aero_fcst_run
if self.run in aero_fcst_run or aero_fcst_run == "both":
if not self.exp_warm_start:
tasks += ['aerosol_init']

if self.do_wave:
Expand Down Expand Up @@ -153,4 +157,4 @@ def get_task_names(self):

tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks

return {f"{self._base['RUN']}": tasks}
return {f"{self.run}": tasks}
15 changes: 7 additions & 8 deletions workflow/rocoto/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3

import copy
import numpy as np
from applications.applications import AppConfig
import rocoto.rocoto as rocoto
Expand Down Expand Up @@ -39,15 +38,16 @@ class Tasks:

def __init__(self, app_config: AppConfig, run: str) -> None:

self.app_config = copy.deepcopy(app_config)
self.app_config = app_config
self.run = run
# Re-source the configs with RUN specified
print(f"Source configs with RUN={run}")
self._configs = self.app_config.source_configs(run=run, log=False)

# Get the configs for the specified RUN
self._configs = self.app_config.configs[run]

# Update the base config for the application
self._configs['base'] = self.app_config.update_base(self._configs['base'])
# Save dict_configs and base in the internal state (never know where it may be needed)
self._configs['base'] = self.app_config._update_base(self._configs['base'])

# Save base in the internal state (never know where it may be needed)
self._base = self._configs['base']

self.HOMEgfs = self._base['HOMEgfs']
Expand Down Expand Up @@ -134,7 +134,6 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) ->
def _get_forecast_hours(run, config, component='atmos') -> List[str]:
# Make a local copy of the config to avoid modifying the original
local_config = config.copy()

# Ocean/Ice components do not have a HF output option like the atmosphere
if component in ['ocean', 'ice']:
local_config['FHMAX_HF_GFS'] = 0
Expand Down
3 changes: 2 additions & 1 deletion workflow/rocoto/workflow_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None:
self._app_config = app_config
self.rocoto_config = rocoto_config

self._base = self._app_config.configs['base']
# Use the generic config.base (without RUN specified)
self._base = self._app_config.configs['_no_run']['base']

self.preamble = self._get_preamble()
self.definitions = self._get_definitions()
Expand Down

0 comments on commit b443915

Please sign in to comment.