Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate python configuration dictionaries #2839

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 57 additions & 58 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Dict, List, Any
from datetime import timedelta
from hosts import Host
from pathlib import Path
from wxflow import Configuration, to_timedelta
from abc import ABC, ABCMeta, abstractmethod

Expand Down Expand Up @@ -32,57 +31,50 @@ def __init__(self, conf: Configuration) -> None:

self.scheduler = Host().scheduler

# Save the configuration so we can source the config files when
# determining task resources
self.conf = conf
base = conf.parse_config('config.base')

_base = self.conf.parse_config('config.base')
# Define here so the child __init__ functions can use it; will
# be overwritten later during _init_finalize().
self._base = _base

self.mode = _base['MODE']
self.mode = base['MODE']

if self.mode not in self.VALID_MODES:
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n' +
'Valid application modes are:\n' +
f'{", ".join(self.VALID_MODES)}')

self.net = _base['NET']
self.model_app = _base.get('APP', 'ATM')
self.do_atm = _base.get('DO_ATM', True)
self.do_wave = _base.get('DO_WAVE', False)
self.do_wave_bnd = _base.get('DOBNDPNT_WAVE', False)
self.do_ocean = _base.get('DO_OCN', False)
self.do_ice = _base.get('DO_ICE', False)
self.do_aero = _base.get('DO_AERO', False)
self.do_prep_obs_aero = _base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = _base.get('DO_BUFRSND', False)
self.do_gempak = _base.get('DO_GEMPAK', False)
self.do_awips = _base.get('DO_AWIPS', False)
self.do_verfozn = _base.get('DO_VERFOZN', True)
self.do_verfrad = _base.get('DO_VERFRAD', True)
self.do_vminmon = _base.get('DO_VMINMON', True)
self.do_tracker = _base.get('DO_TRACKER', True)
self.do_genesis = _base.get('DO_GENESIS', True)
self.do_genesis_fsu = _base.get('DO_GENESIS_FSU', False)
self.do_metp = _base.get('DO_METP', False)
self.do_upp = not _base.get('WRITE_DOPOST', True)
self.do_goes = _base.get('DO_GOES', False)
self.do_mos = _base.get('DO_MOS', False)
self.do_extractvars = _base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = _base.get('HPSSARCH', False)

self.nens = _base.get('NMEM_ENS', 0)
self.fcst_segments = _base.get('FCST_SEGMENTS', None)
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n'
f'Valid application modes are:\n'
f'{", ".join(self.VALID_MODES)}\n')

self.net = base['NET']
self.model_app = base.get('APP', 'ATM')
self.do_atm = base.get('DO_ATM', True)
self.do_wave = base.get('DO_WAVE', False)
self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False)
self.do_ocean = base.get('DO_OCN', False)
self.do_ice = base.get('DO_ICE', False)
self.do_aero = base.get('DO_AERO', False)
self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = base.get('DO_BUFRSND', False)
self.do_gempak = base.get('DO_GEMPAK', False)
self.do_awips = base.get('DO_AWIPS', False)
self.do_verfozn = base.get('DO_VERFOZN', True)
self.do_verfrad = base.get('DO_VERFRAD', True)
self.do_vminmon = base.get('DO_VMINMON', True)
self.do_tracker = base.get('DO_TRACKER', True)
self.do_genesis = base.get('DO_GENESIS', True)
self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False)
self.do_metp = base.get('DO_METP', False)
self.do_upp = not base.get('WRITE_DOPOST', True)
self.do_goes = base.get('DO_GOES', False)
self.do_mos = base.get('DO_MOS', False)
self.do_extractvars = base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = base.get('HPSSARCH', False)

self.nens = base.get('NMEM_ENS', 0)
self.fcst_segments = base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(self.fcst_segments):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

self.wave_runs = None
if self.do_wave:
wave_run = _base.get('WAVE_RUN', 'BOTH').lower()
wave_run = base.get('WAVE_RUN', 'BOTH').lower()
if wave_run in ['both']:
self.wave_runs = ['gfs', 'gdas']
elif wave_run in ['gfs', 'gdas']:
Expand All @@ -91,45 +83,52 @@ def __init__(self, conf: Configuration) -> None:
self.aero_anl_runs = None
self.aero_fcst_runs = None
if self.do_aero:
aero_anl_run = _base.get('AERO_ANL_RUN', 'BOTH').lower()
aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower()
if aero_anl_run in ['both']:
self.aero_anl_runs = ['gfs', 'gdas']
elif aero_anl_run in ['gfs', 'gdas']:
self.aero_anl_runs = [aero_anl_run]
aero_fcst_run = _base.get('AERO_FCST_RUN', None).lower()
aero_fcst_run = base.get('AERO_FCST_RUN', None).lower()
if aero_fcst_run in ['both']:
self.aero_fcst_runs = ['gfs', 'gdas']
elif aero_fcst_run in ['gfs', 'gdas']:
self.aero_fcst_runs = [aero_fcst_run]

def _init_finalize(self, *args):
def _init_finalize(self, conf: Configuration):
print("Finalizing initialize")

# Get a list of all possible config_files that would be part of the application
self.configs_names = self._get_app_configs()

# Source the config_files for the jobs in the application
self.configs = self.source_configs()
# Source the config files for the jobs in the application without specifying a RUN
self.configs = {'_no_run': self._source_configs(conf)}
WalterKolczynski-NOAA marked this conversation as resolved.
Show resolved Hide resolved

# Update the base config dictionary base on application
self.configs['base'] = self.update_base(self.configs['base'])
# Update the base config dictionary based on application
self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base'])

# Save base in the internal state since it is often needed
self._base = self.configs['base']
base = self.configs['_no_run']['base']

# Get more configuration options into the class attributes
self.gfs_cyc = self._base.get('gfs_cyc')
self.gfs_cyc = base.get('gfs_cyc')

# Finally get task names for the application
# Get task names for the application
self.task_names = self.get_task_names()

# Finally, source the configuration files for each valid `RUN`
for run in self.task_names.keys():
self.configs[run] = self._source_configs(conf, run=run, log=False)

# Update the base config dictionary based on application and RUN
self.configs[run]['base'] = self._update_base(self.configs[run]['base'])

@abstractmethod
def _get_app_configs(self):
pass

@staticmethod
@abstractmethod
def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
def _update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
'''
Make final updates to base and return an updated copy

Expand All @@ -146,7 +145,7 @@ def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]:
'''
pass

def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
"""
Given the configuration object used to initialize this application,
source the configurations for each config and return a dictionary
Expand All @@ -156,7 +155,7 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
configs = dict()

# Return config.base as well
configs['base'] = self.conf.parse_config('config.base')
configs['base'] = conf.parse_config('config.base', RUN=run)

# Source the list of all config_files involved in the application
for config in self.configs_names:
Expand All @@ -180,12 +179,12 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]:
files += [f'config.{config}']

print(f'sourcing config.{config}') if log else 0
configs[config] = self.conf.parse_config(files, RUN=run)
configs[config] = conf.parse_config(files, RUN=run)

return configs

@abstractmethod
def get_task_names(self) -> Dict[str, List[str]]:
def get_task_names(self, run="_no_run") -> Dict[str, List[str]]:
'''
Create a list of task names for each RUN valid for the configuation.

Expand Down
7 changes: 5 additions & 2 deletions workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class GEFSAppConfig(AppConfig):
def __init__(self, conf: Configuration):
super().__init__(conf)

base = conf.parse_config('config.base')
self.run = base.get('RUN', 'gefs')

def _get_app_configs(self):
"""
Returns the config_files that are involved in gefs
Expand All @@ -36,7 +39,7 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

base_out = base_in.copy()
base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc'])
Expand Down Expand Up @@ -81,4 +84,4 @@ def get_task_names(self):

tasks += ['arch']

return {f"{self._base['RUN']}": tasks}
return {f"{self.run}": tasks}
23 changes: 12 additions & 11 deletions workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ class GFSCycledAppConfig(AppConfig):

def __init__(self, conf: Configuration):
super().__init__(conf)
self.do_hybvar = self._base.get('DOHYBVAR', False)
self.do_fit2obs = self._base.get('DO_FIT2OBS', True)
self.do_jediatmvar = self._base.get('DO_JEDIATMVAR', False)
self.do_jediatmens = self._base.get('DO_JEDIATMENS', False)
self.do_jediocnvar = self._base.get('DO_JEDIOCNVAR', False)
self.do_jedisnowda = self._base.get('DO_JEDISNOWDA', False)
self.do_mergensst = self._base.get('DO_MERGENSST', False)
self.do_vrfy_oceanda = self._base.get('DO_VRFY_OCEANDA', False)
base = conf.parse_config('config.base')
self.do_hybvar = base.get('DOHYBVAR', False)
self.do_fit2obs = base.get('DO_FIT2OBS', True)
self.do_jediatmvar = base.get('DO_JEDIATMVAR', False)
self.do_jediatmens = base.get('DO_JEDIATMENS', False)
self.do_jediocnvar = base.get('DO_JEDIOCNVAR', False)
self.do_jedisnowda = base.get('DO_JEDISNOWDA', False)
self.do_mergensst = base.get('DO_MERGENSST', False)
self.do_vrfy_oceanda = base.get('DO_VRFY_OCEANDA', False)

self.lobsdiag_forenkf = False
self.eupd_runs = None
if self.do_hybvar:
self.lobsdiag_forenkf = self._base.get('lobsdiag_forenkf', False)
eupd_run = self._base.get('EUPD_CYC', 'gdas').lower()
self.lobsdiag_forenkf = base.get('lobsdiag_forenkf', False)
eupd_run = base.get('EUPD_CYC', 'gdas').lower()
if eupd_run in ['both']:
self.eupd_runs = ['gfs', 'gdas']
elif eupd_run in ['gfs', 'gdas']:
Expand Down Expand Up @@ -125,7 +126,7 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

return GFSCycledAppConfig.get_gfs_cyc_dates(base_in)

Expand Down
18 changes: 11 additions & 7 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ class GFSForecastOnlyAppConfig(AppConfig):
def __init__(self, conf: Configuration):
super().__init__(conf)

base = conf.parse_config('config.base')
self.aero_fcst_run = base.get('AERO_FCST_RUN', 'BOTH').lower()
self.run = base.get('RUN', 'gfs')
self.exp_warm_start = base.get('EXP_WARM_START', False)

def _get_app_configs(self):
"""
Returns the config_files that are involved in the forecast-only app
Expand All @@ -25,7 +30,7 @@ def _get_app_configs(self):
configs += ['atmos_products']

if self.do_aero:
if not self._base['EXP_WARM_START']:
if not self.exp_warm_start:
configs += ['aerosol_init']

if self.do_tracker:
Expand Down Expand Up @@ -70,11 +75,10 @@ def _get_app_configs(self):
return configs

@staticmethod
def update_base(base_in):
def _update_base(base_in):

base_out = base_in.copy()
base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc'])
base_out['RUN'] = 'gfs'

return base_out

Expand All @@ -88,9 +92,9 @@ def get_task_names(self):
tasks = ['stage_ic']

if self.do_aero:
aero_fcst_run = self._base.get('AERO_FCST_RUN', 'BOTH').lower()
if self._base['RUN'] in aero_fcst_run or aero_fcst_run == "both":
if not self._base['EXP_WARM_START']:
aero_fcst_run = self.aero_fcst_run
if self.run in aero_fcst_run or aero_fcst_run == "both":
if not self.exp_warm_start:
tasks += ['aerosol_init']

if self.do_wave:
Expand Down Expand Up @@ -153,4 +157,4 @@ def get_task_names(self):

tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks

return {f"{self._base['RUN']}": tasks}
return {f"{self.run}": tasks}
15 changes: 7 additions & 8 deletions workflow/rocoto/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3

import copy
import numpy as np
from applications.applications import AppConfig
import rocoto.rocoto as rocoto
Expand Down Expand Up @@ -39,15 +38,16 @@ class Tasks:

def __init__(self, app_config: AppConfig, run: str) -> None:

self.app_config = copy.deepcopy(app_config)
self.app_config = app_config
self.run = run
# Re-source the configs with RUN specified
print(f"Source configs with RUN={run}")
self._configs = self.app_config.source_configs(run=run, log=False)

# Get the configs for the specified RUN
self._configs = self.app_config.configs[run]

# Update the base config for the application
self._configs['base'] = self.app_config.update_base(self._configs['base'])
# Save dict_configs and base in the internal state (never know where it may be needed)
self._configs['base'] = self.app_config._update_base(self._configs['base'])
WalterKolczynski-NOAA marked this conversation as resolved.
Show resolved Hide resolved

# Save base in the internal state (never know where it may be needed)
self._base = self._configs['base']

self.HOMEgfs = self._base['HOMEgfs']
Expand Down Expand Up @@ -134,7 +134,6 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) ->
def _get_forecast_hours(run, config, component='atmos') -> List[str]:
# Make a local copy of the config to avoid modifying the original
local_config = config.copy()

# Ocean/Ice components do not have a HF output option like the atmosphere
if component in ['ocean', 'ice']:
local_config['FHMAX_HF_GFS'] = 0
Expand Down
3 changes: 2 additions & 1 deletion workflow/rocoto/workflow_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None:
self._app_config = app_config
self.rocoto_config = rocoto_config

self._base = self._app_config.configs['base']
# Use the generic config.base (without RUN specified)
self._base = self._app_config.configs['_no_run']['base']

self.preamble = self._get_preamble()
self.definitions = self._get_definitions()
Expand Down
Loading