From 9ad7d3e923103c888f10fa02dbc895b67f83b2b6 Mon Sep 17 00:00:00 2001 From: RussTreadon-NOAA <26926959+RussTreadon-NOAA@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:23:30 -0400 Subject: [PATCH] Add JEDI ATM lgetkf observer and solver jobs (#2833) This PR adds JEDI ATM lgetkf observer and solver jobs to global-workflow. This is approach is akin GSI-based eobs and eupd. Splitting the single JEDI ATM lgetkf job into separate observer and solver jobs improves memory and computational efficiency. Resolves #2415 --- ci/cases/pr/C96C48_ufs_hybatmDA.yaml | 2 - env/HERA.env | 10 ++ env/HERCULES.env | 10 ++ env/JET.env | 10 ++ env/ORION.env | 9 ++ env/S4.env | 10 ++ env/WCOSS2.env | 37 ++++++ jobs/JGLOBAL_ATMENS_ANALYSIS_OBS | 35 ++++++ jobs/JGLOBAL_ATMENS_ANALYSIS_SOL | 35 ++++++ jobs/rocoto/atmensanlobs.sh | 18 +++ jobs/rocoto/atmensanlsol.sh | 18 +++ parm/archive/enkf.yaml.j2 | 29 +++-- parm/config/gfs/config.atmensanlobs | 13 +++ parm/config/gfs/config.atmensanlsol | 13 +++ parm/config/gfs/config.base | 1 + parm/config/gfs/config.resources | 28 ++++- parm/config/gfs/config.resources.HERA | 13 +++ parm/config/gfs/config.resources.HERCULES | 11 ++ parm/config/gfs/config.resources.WCOSS2 | 11 ++ parm/config/gfs/yaml/defaults.yaml | 6 + parm/stage/analysis.yaml.j2 | 7 ++ scripts/exglobal_atmens_analysis_obs.py | 23 ++++ scripts/exglobal_atmens_analysis_sol.py | 23 ++++ scripts/exglobal_stage_ic.py | 2 +- sorc/gdas.cd | 2 +- ush/python/pygfs/task/atmens_analysis.py | 130 ++++++++++++++++++++-- ush/python/pygfs/task/stage_ic.py | 4 + workflow/applications/applications.py | 2 +- workflow/applications/gfs_cycled.py | 5 +- workflow/rocoto/gfs_tasks.py | 57 +++++++++- workflow/rocoto/tasks.py | 2 +- 31 files changed, 544 insertions(+), 32 deletions(-) create mode 100755 jobs/JGLOBAL_ATMENS_ANALYSIS_OBS create mode 100755 jobs/JGLOBAL_ATMENS_ANALYSIS_SOL create mode 100755 jobs/rocoto/atmensanlobs.sh create mode 100755 jobs/rocoto/atmensanlsol.sh create mode 100644 parm/config/gfs/config.atmensanlobs create mode 100644 parm/config/gfs/config.atmensanlsol create mode 100755 scripts/exglobal_atmens_analysis_obs.py create mode 100755 scripts/exglobal_atmens_analysis_sol.py diff --git a/ci/cases/pr/C96C48_ufs_hybatmDA.yaml b/ci/cases/pr/C96C48_ufs_hybatmDA.yaml index f835b34593..0b5aa7b6ac 100644 --- a/ci/cases/pr/C96C48_ufs_hybatmDA.yaml +++ b/ci/cases/pr/C96C48_ufs_hybatmDA.yaml @@ -18,9 +18,7 @@ arguments: yaml: {{ HOMEgfs }}/ci/cases/yamls/ufs_hybatmDA_defaults.ci.yaml skip_ci_on_hosts: - - hera - gaea - orion - hercules - - wcoss2 diff --git a/env/HERA.env b/env/HERA.env index 697cf21965..8d59c870cc 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -72,6 +72,16 @@ elif [[ "${step}" = "atmanlvar" ]]; then export NTHREADS_ATMANLVAR=${NTHREADSmax} export APRUN_ATMANLVAR="${APRUN} --cpus-per-task=${NTHREADS_ATMANLVAR}" +elif [[ "${step}" = "atmensanlobs" ]]; then + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLOBS}" + +elif [[ "${step}" = "atmensanlsol" ]]; then + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLSOL}" + elif [[ "${step}" = "atmensanlletkf" ]]; then export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} diff --git a/env/HERCULES.env b/env/HERCULES.env index 83d934c91a..79ff8391bd 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -76,6 +76,16 @@ case ${step} in export NTHREADS_ATMANLFV3INC=${NTHREADSmax} export APRUN_ATMANLFV3INC="${APRUN} --cpus-per-task=${NTHREADS_ATMANLFV3INC}" ;; + "atmensanlobs") + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLOBS}" + ;; + "atmensanlsol") + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLSOL}" + ;; "atmensanlletkf") export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} diff --git a/env/JET.env b/env/JET.env index 473539ded1..4294a00de1 100755 --- a/env/JET.env +++ b/env/JET.env @@ -60,6 +60,16 @@ elif [[ "${step}" = "atmanlvar" ]]; then export NTHREADS_ATMANLVAR=${NTHREADSmax} export APRUN_ATMANLVAR="${APRUN}" +elif [[ "${step}" = "atmensanlobs" ]]; then + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN}" + +elif [[ "${step}" = "atmensanlsol" ]]; then + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN}" + elif [[ "${step}" = "atmensanlletkf" ]]; then export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} diff --git a/env/ORION.env b/env/ORION.env index 65a8871cdd..7838d28640 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -68,6 +68,15 @@ elif [[ "${step}" = "atmanlvar" ]]; then export NTHREADS_ATMANLVAR=${NTHREADSmax} export APRUN_ATMANLVAR="${APRUN} --cpus-per-task=${NTHREADS_ATMANLVAR}" +elif [[ "${step}" = "atmensanlobs" ]]; then + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLOBS}" + +elif [[ "${step}" = "atmensanlsol" ]]; then + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN} --cpus-per-task=${NTHREADS_ATMENSANLSOL}" elif [[ "${step}" = "atmensanlletkf" ]]; then export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} diff --git a/env/S4.env b/env/S4.env index d0985e44ca..a29ec49fb4 100755 --- a/env/S4.env +++ b/env/S4.env @@ -60,6 +60,16 @@ elif [[ "${step}" = "atmanlvar" ]]; then export NTHREADS_ATMANLVAR=${NTHREADSmax} export APRUN_ATMANLVAR="${APRUN}" +elif [[ "${step}" = "atmensanlobs" ]]; then + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN}" + +elif [[ "${step}" = "atmensanlsol" ]]; then + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN}" + elif [[ "${step}" = "atmensanlletkf" ]]; then export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} diff --git a/env/WCOSS2.env b/env/WCOSS2.env index cf9feeca83..adff54d636 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -53,6 +53,16 @@ elif [[ "${step}" = "atmanlvar" ]]; then export NTHREADS_ATMANLVAR=${NTHREADSmax} export APRUN_ATMANLVAR="${APRUN}" +elif [[ "${step}" = "atmensanlobs" ]]; then + + export NTHREADS_ATMENSANLOBS=${NTHREADSmax} + export APRUN_ATMENSANLOBS="${APRUN}" + +elif [[ "${step}" = "atmensanlsol" ]]; then + + export NTHREADS_ATMENSANLSOL=${NTHREADSmax} + export APRUN_ATMENSANLSOL="${APRUN}" + elif [[ "${step}" = "atmensanlletkf" ]]; then export NTHREADS_ATMENSANLLETKF=${NTHREADSmax} @@ -89,6 +99,33 @@ elif [[ "${step}" = "esnowrecen" ]]; then export APRUN_APPLY_INCR="${launcher} -n 6" +elif [[ "${step}" = "marinebmat" ]]; then + + export APRUNCFP="${launcher} -n \$ncmd --multi-prog" + export APRUN_MARINEBMAT="${APRUN}" + +elif [[ "${step}" = "ocnanalrun" ]]; then + + export APRUNCFP="${launcher} -n \$ncmd --multi-prog" + + export APRUN_OCNANAL="${APRUN}" + +elif [[ "${step}" = "ocnanalchkpt" ]]; then + + export APRUNCFP="${launcher} -n \$ncmd --multi-prog" + + export APRUN_OCNANAL="${APRUN}" + +elif [[ "${step}" = "ocnanalecen" ]]; then + + export NTHREADS_OCNANALECEN=${NTHREADSmax} + export APRUN_OCNANALECEN="${APRUN} --cpus-per-task=${NTHREADS_OCNANALECEN}" + +elif [[ "${step}" = "marineanalletkf" ]]; then + + export NTHREADS_MARINEANALLETKF=${NTHREADSmax} + export APRUN_MARINEANALLETKF="${APRUN} --cpus-per-task=${NTHREADS_MARINEANALLETKF}" + elif [[ "${step}" = "atmanlfv3inc" ]]; then export NTHREADS_ATMANLFV3INC=${NTHREADSmax} diff --git a/jobs/JGLOBAL_ATMENS_ANALYSIS_OBS b/jobs/JGLOBAL_ATMENS_ANALYSIS_OBS new file mode 100755 index 0000000000..9d858a8a37 --- /dev/null +++ b/jobs/JGLOBAL_ATMENS_ANALYSIS_OBS @@ -0,0 +1,35 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +export WIPE_DATA="NO" +export DATA=${DATA:-${DATAROOT}/${RUN}atmensanl_${cyc}} +source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlobs" -c "base atmensanl atmensanlobs" + +############################################## +# Set variables used in the script +############################################## + +############################################## +# Begin JOB SPECIFIC work +############################################## + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASATMENSOBSSH:-${SCRgfs}/exglobal_atmens_analysis_obs.py} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/JGLOBAL_ATMENS_ANALYSIS_SOL b/jobs/JGLOBAL_ATMENS_ANALYSIS_SOL new file mode 100755 index 0000000000..415791cdd0 --- /dev/null +++ b/jobs/JGLOBAL_ATMENS_ANALYSIS_SOL @@ -0,0 +1,35 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +export WIPE_DATA="NO" +export DATA=${DATA:-${DATAROOT}/${RUN}atmensanl_${cyc}} +source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlsol" -c "base atmensanl atmensanlsol" + +############################################## +# Set variables used in the script +############################################## + +############################################## +# Begin JOB SPECIFIC work +############################################## + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASATMENSSOLSH:-${SCRgfs}/exglobal_atmens_analysis_sol.py} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/rocoto/atmensanlobs.sh b/jobs/rocoto/atmensanlobs.sh new file mode 100755 index 0000000000..d02d013bcd --- /dev/null +++ b/jobs/rocoto/atmensanlobs.sh @@ -0,0 +1,18 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +export job="atmensanlobs" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_OBS" +status=$? +exit "${status}" diff --git a/jobs/rocoto/atmensanlsol.sh b/jobs/rocoto/atmensanlsol.sh new file mode 100755 index 0000000000..e1fe59d986 --- /dev/null +++ b/jobs/rocoto/atmensanlsol.sh @@ -0,0 +1,18 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +export job="atmensanlsol" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_SOL" +status=$? +exit "${status}" diff --git a/parm/archive/enkf.yaml.j2 b/parm/archive/enkf.yaml.j2 index 92ed0095af..d3f16e8e69 100644 --- a/parm/archive/enkf.yaml.j2 +++ b/parm/archive/enkf.yaml.j2 @@ -15,17 +15,21 @@ enkf: - "logs/{{ cycle_YMDH }}/{{ RUN }}ecen{{ '%03d' % grp }}.log" {% endfor %} - {% if DO_JEDIATMENS %} - {% set steps = ["atmensanlinit", "atmensanlletkf", "atmensanlfv3inc", "atmensanlfinal"] %} - {% else %} - {% set steps = ["eobs", "eupd"] %} {% if lobsdiag_forenkf %} - {% do steps.append("ediag") %} + {% if DO_JEDIATMENS %} + {% set steps = ["atmensanlinit", "atmensanlobs", "atmensanlsol", "atmensanlfv3inc", "atmensanlfinal"] %} + {% else %} + {% set steps = ["eobs", "ediag", "eupd"] %} + {% endif %} {% else %} - {% for mem in range(1, nmem_ens + 1) %} - {% do steps.append("eomg_mem{{ '%03d' % mem }}") %} - {% endfor %} - {% endif %} + {% if DO_JEDIATMENS %} + {% set steps = ["atmensanlinit", "atmensanlletkf", "atmensanlfv3inc", "atmensanlfinal"] %} + {% else %} + {% set steps = ["eobs", "eupd"] %} + {% for mem in range(1, nmem_ens + 1) %} + {% do steps.append("eomg_mem{{ '%03d' % mem }}") %} + {% endfor %} + {% endif %} {% endif %} {% for step in steps %} @@ -49,10 +53,17 @@ enkf: "oznstat.ensmean", "radstat.ensmean"] %} {% else %} + {% if lobsdiag_forenkf %} + {% set da_files = ["atmens_observer.yaml", + "atmens_solver.yaml", + "atminc.ensmean.nc", + "atmensstat"] %} + {% else %} {% set da_files = ["atmens.yaml", "atminc.ensmean.nc", "atmensstat"] %} {% endif %} + {% endif %} {% for file in da_files %} - "{{ COMIN_ATMOS_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}{{ file }}" {% endfor %} diff --git a/parm/config/gfs/config.atmensanlobs b/parm/config/gfs/config.atmensanlobs new file mode 100644 index 0000000000..dff3fa3095 --- /dev/null +++ b/parm/config/gfs/config.atmensanlobs @@ -0,0 +1,13 @@ +#! /usr/bin/env bash + +########## config.atmensanlobs ########## +# Pre Atm Ens Analysis specific + +echo "BEGIN: config.atmensanlobs" + +# Get task specific resources +. "${EXPDIR}/config.resources" atmensanlobs + +export JCB_ALGO_YAML=@JCB_ALGO_YAML@ + +echo "END: config.atmensanlobs" diff --git a/parm/config/gfs/config.atmensanlsol b/parm/config/gfs/config.atmensanlsol new file mode 100644 index 0000000000..dac161373b --- /dev/null +++ b/parm/config/gfs/config.atmensanlsol @@ -0,0 +1,13 @@ +#! /usr/bin/env bash + +########## config.atmensanlsol ########## +# Pre Atm Ens Analysis specific + +echo "BEGIN: config.atmensanlsol" + +# Get task specific resources +. "${EXPDIR}/config.resources" atmensanlsol + +export JCB_ALGO_YAML=@JCB_ALGO_YAML@ + +echo "END: config.atmensanlsol" diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 544113f942..517e74ebff 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -470,6 +470,7 @@ export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs foreca # The monitor jobs are not yet supported for JEDIATMVAR. if [[ ${DO_JEDIATMVAR} = "YES" ]]; then + export DO_FIT2OBS="NO" # Run fit to observations package export DO_VERFOZN="NO" # Ozone data assimilation monitoring export DO_VERFRAD="NO" # Radiance data assimilation monitoring export DO_VMINMON="NO" # GSI minimization monitoring diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 978dca6d51..2f541ff945 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -14,7 +14,7 @@ if (( $# != 1 )); then echo "stage_ic aerosol_init" echo "prep prepsnowobs prepatmiodaobs" echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" - echo "atmensanlinit atmensanlletkf atmensanlfv3inc atmensanlfinal" + echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal" echo "snowanl esnowrecen" echo "prepobsaero aeroanlinit aeroanlrun aeroanlfinal" echo "anal sfcanl analcalc analdiag fcst echgres" @@ -286,7 +286,7 @@ case ${step} in ntasks=1 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) - memory="3072M" + memory="4GB" ;; "atmanlvar") @@ -1004,6 +1004,30 @@ case ${step} in memory="3072M" ;; + "atmensanlobs") + export layout_x=${layout_x_atmensanl} + export layout_y=${layout_y_atmensanl} + + walltime="00:30:00" + ntasks=$(( layout_x * layout_y * 6 )) + threads_per_task=1 + tasks_per_node=$(( max_tasks_per_node / threads_per_task )) + memory="96GB" + export is_exclusive=True + ;; + + "atmensanlsol") + export layout_x=${layout_x_atmensanl} + export layout_y=${layout_y_atmensanl} + + walltime="00:30:00" + ntasks=$(( layout_x * layout_y * 6 )) + threads_per_task=1 + tasks_per_node=$(( max_tasks_per_node / threads_per_task )) + memory="96GB" + export is_exclusive=True + ;; + "atmensanlletkf") export layout_x=${layout_x_atmensanl} export layout_y=${layout_y_atmensanl} diff --git a/parm/config/gfs/config.resources.HERA b/parm/config/gfs/config.resources.HERA index 36f50508c3..e79d4c5b0a 100644 --- a/parm/config/gfs/config.resources.HERA +++ b/parm/config/gfs/config.resources.HERA @@ -11,6 +11,19 @@ case ${step} in fi ;; + "atmanlvar") + export tasks_per_node_gdas=12 + export tasks_per_node_gfs=12 + ;; + + "atmensanlobs") + export tasks_per_node=12 + ;; + + "atmensanlsol") + export tasks_per_node=12 + ;; + "eupd") case ${CASE} in "C384") diff --git a/parm/config/gfs/config.resources.HERCULES b/parm/config/gfs/config.resources.HERCULES index 7a5a74f69c..65ea508e01 100644 --- a/parm/config/gfs/config.resources.HERCULES +++ b/parm/config/gfs/config.resources.HERCULES @@ -11,6 +11,17 @@ case ${step} in export tasks_per_node=20 fi ;; + "atmanlvar") + export tasks_per_node_gdas=48 + export tasks_per_node_gfs=48 + export memory="400GB" + ;; + + "atmensanlobs") + export tasks_per_node=48 + export memory="400GB" + ;; + *) ;; esac diff --git a/parm/config/gfs/config.resources.WCOSS2 b/parm/config/gfs/config.resources.WCOSS2 index a0a69fa8d1..3ff019068c 100644 --- a/parm/config/gfs/config.resources.WCOSS2 +++ b/parm/config/gfs/config.resources.WCOSS2 @@ -18,6 +18,17 @@ case ${step} in fi ;; + "atmanlvar") + export tasks_per_node_gdas=48 + export tasks_per_node_gfs=48 + export memory="400GB" + ;; + + "atmensanlobs") + export tasks_per_node=48 + export memory="400GB" + ;; + "fit2obs") export tasks_per_node=3 ;; diff --git a/parm/config/gfs/yaml/defaults.yaml b/parm/config/gfs/yaml/defaults.yaml index 24729ac43e..b423601df3 100644 --- a/parm/config/gfs/yaml/defaults.yaml +++ b/parm/config/gfs/yaml/defaults.yaml @@ -37,6 +37,12 @@ atmensanl: IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 +atmensanlobs: + JCB_ALGO_YAML: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_observer.yaml.j2" + +atmensanlsol: + JCB_ALGO_YAML: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_solver.yaml.j2" + aeroanl: IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 diff --git a/parm/stage/analysis.yaml.j2 b/parm/stage/analysis.yaml.j2 index e014313b6d..d30389644a 100644 --- a/parm/stage/analysis.yaml.j2 +++ b/parm/stage/analysis.yaml.j2 @@ -15,5 +15,12 @@ analysis: - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ RUN }}.t{{ current_cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS_MEM }}"] {% endif %} {% endfor %} + {% if DO_JEDIATMVAR %} + {% for ftype in ["satbias.nc", "satbias_cov.nc", "tlapse.txt"] %} + {% for file in glob(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ current_cycle_HH ~ "z.atms_*." ~ ftype) %} + - ["{{ file }}", "{{ COMOUT_ATMOS_ANALYSIS_MEM }}"] + {% endfor %} + {% endfor %} + {% endif %} {% endfor %} # mem loop {% endif %} diff --git a/scripts/exglobal_atmens_analysis_obs.py b/scripts/exglobal_atmens_analysis_obs.py new file mode 100755 index 0000000000..e4b5c98952 --- /dev/null +++ b/scripts/exglobal_atmens_analysis_obs.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# exglobal_atmens_analysis_obs.py +# This script creates an AtmEnsAnalysis object +# and runs the execute method +# which executes the global atm local ensemble analysis in observer mode +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs.task.atmens_analysis import AtmEnsAnalysis + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the atmens analysis task + AtmEnsAnl = AtmEnsAnalysis(config) + AtmEnsAnl.init_observer() + AtmEnsAnl.observe() diff --git a/scripts/exglobal_atmens_analysis_sol.py b/scripts/exglobal_atmens_analysis_sol.py new file mode 100755 index 0000000000..db55959753 --- /dev/null +++ b/scripts/exglobal_atmens_analysis_sol.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# exglobal_atmens_analysis_sol.py +# This script creates an AtmEnsAnalysis object +# and runs the execute method +# which executes the global atm local ensemble analysis in solver mode +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs.task.atmens_analysis import AtmEnsAnalysis + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the atmens analysis task + AtmEnsAnl = AtmEnsAnalysis(config) + AtmEnsAnl.init_solver() + AtmEnsAnl.solve() diff --git a/scripts/exglobal_stage_ic.py b/scripts/exglobal_stage_ic.py index 5efc1bca96..d4c212a297 100755 --- a/scripts/exglobal_stage_ic.py +++ b/scripts/exglobal_stage_ic.py @@ -20,7 +20,7 @@ def main(): # Pull out all the configuration keys needed to run stage job keys = ['RUN', 'MODE', 'EXP_WARM_START', 'NMEM_ENS', 'assim_freq', 'current_cycle', 'previous_cycle', - 'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL', + 'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL', 'DO_JEDIATMVAR', 'OCNRES', 'waveGRD', 'ntiles', 'DOIAU', 'DO_JEDIOCNVAR', 'REPLAY_ICS', 'DO_WAVE', 'DO_OCN', 'DO_ICE', 'DO_NEST'] diff --git a/sorc/gdas.cd b/sorc/gdas.cd index 0431b26650..09594d1c03 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit 0431b26650c5e5d4eb741304a05c841d3fda0ddc +Subproject commit 09594d1c032fd187f9869ac74b2b5b351112e93c diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index bd5112050e..2e51f82d59 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -17,6 +17,7 @@ WorkflowException, Template, TemplateConstants) from pygfs.task.analysis import Analysis +from jcb import render logger = getLogger(__name__.split('.')[-1]) @@ -93,9 +94,10 @@ def initialize(self: Analysis) -> None: FileHandler(bkg_staging_dict).sync() # generate ensemble da YAML file - logger.debug(f"Generate ensemble da YAML file: {self.task_config.jedi_yaml}") - save_as_yaml(self.task_config.jedi_config, self.task_config.jedi_yaml) - logger.info(f"Wrote ensemble da YAML to: {self.task_config.jedi_yaml}") + if not self.task_config.lobsdiag_forenkf: + logger.debug(f"Generate ensemble da YAML file: {self.task_config.jedi_yaml}") + save_as_yaml(self.task_config.jedi_config, self.task_config.jedi_yaml) + logger.info(f"Wrote ensemble da YAML to: {self.task_config.jedi_yaml}") # need output dir for diags and anl logger.debug("Create empty output [anl, diags] directories to receive output from executable") @@ -105,6 +107,80 @@ def initialize(self: Analysis) -> None: ] FileHandler({'mkdir': newdirs}).sync() + @logit(logger) + def observe(self: Analysis) -> None: + """Execute a global atmens analysis in observer mode + + This method will execute a global atmens analysis in observer mode using JEDI. + This includes: + - changing to the run directory + - running the global atmens analysis executable in observer mode + + Parameters + ---------- + Analysis: parent class for GDAS task + + Returns + ---------- + None + """ + chdir(self.task_config.DATA) + + exec_cmd = Executable(self.task_config.APRUN_ATMENSANLOBS) + exec_name = os.path.join(self.task_config.DATA, 'gdas.x') + + exec_cmd.add_default_arg(exec_name) + exec_cmd.add_default_arg('fv3jedi') + exec_cmd.add_default_arg('localensembleda') + exec_cmd.add_default_arg(self.task_config.jedi_yaml) + + try: + logger.debug(f"Executing {exec_cmd}") + exec_cmd() + except OSError: + raise OSError(f"Failed to execute {exec_cmd}") + except Exception: + raise WorkflowException(f"An error occured during execution of {exec_cmd}") + + pass + + @logit(logger) + def solve(self: Analysis) -> None: + """Execute a global atmens analysis in solver mode + + This method will execute a global atmens analysis in solver mode using JEDI. + This includes: + - changing to the run directory + - running the global atmens analysis executable in solver mode + + Parameters + ---------- + Analysis: parent class for GDAS task + + Returns + ---------- + None + """ + chdir(self.task_config.DATA) + + exec_cmd = Executable(self.task_config.APRUN_ATMENSANLSOL) + exec_name = os.path.join(self.task_config.DATA, 'gdas.x') + + exec_cmd.add_default_arg(exec_name) + exec_cmd.add_default_arg('fv3jedi') + exec_cmd.add_default_arg('localensembleda') + exec_cmd.add_default_arg(self.task_config.jedi_yaml) + + try: + logger.debug(f"Executing {exec_cmd}") + exec_cmd() + except OSError: + raise OSError(f"Failed to execute {exec_cmd}") + except Exception: + raise WorkflowException(f"An error occured during execution of {exec_cmd}") + + pass + @logit(logger) def letkf(self: Analysis) -> None: """Execute a global atmens analysis @@ -142,6 +218,34 @@ def letkf(self: Analysis) -> None: pass + @logit(logger) + def init_observer(self: Analysis) -> None: + # Setup JEDI YAML file + jcb_config = parse_j2yaml(self.task_config.JCB_BASE_YAML, self.task_config) + jcb_algo_config = parse_j2yaml(self.task_config.JCB_ALGO_YAML, self.task_config) + jcb_config.update(jcb_algo_config) + jedi_config = render(jcb_config) + + self.task_config.jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens_observer.yaml") + + logger.debug(f"Generate ensemble da observer YAML file: {self.task_config.jedi_yaml}") + save_as_yaml(jedi_config, self.task_config.jedi_yaml) + logger.info(f"Wrote ensemble da observer YAML to: {self.task_config.jedi_yaml}") + + @logit(logger) + def init_solver(self: Analysis) -> None: + # Setup JEDI YAML file + jcb_config = parse_j2yaml(self.task_config.JCB_BASE_YAML, self.task_config) + jcb_algo_config = parse_j2yaml(self.task_config.JCB_ALGO_YAML, self.task_config) + jcb_config.update(jcb_algo_config) + jedi_config = render(jcb_config) + + self.task_config.jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens_solver.yaml") + + logger.debug(f"Generate ensemble da solver YAML file: {self.task_config.jedi_yaml}") + save_as_yaml(jedi_config, self.task_config.jedi_yaml) + logger.info(f"Wrote ensemble da solver YAML to: {self.task_config.jedi_yaml}") + @logit(logger) def init_fv3_increment(self: Analysis) -> None: # Setup JEDI YAML file @@ -207,16 +311,18 @@ def finalize(self: Analysis) -> None: diaggzip = f"{diagfile}.gz" archive.add(diaggzip, arcname=os.path.basename(diaggzip)) + # get list of yamls to cop to ROTDIR + yamls = glob.glob(os.path.join(self.task_config.DATA, '*atmens*yaml')) + # copy full YAML from executable to ROTDIR - logger.info(f"Copying {self.task_config.jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}") - src = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens.yaml") - dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens.yaml") - logger.debug(f"Copying {src} to {dest}") - yaml_copy = { - 'mkdir': [self.task_config.COM_ATMOS_ANALYSIS_ENS], - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() + for src in yamls: + logger.info(f"Copying {src} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}") + dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, os.path.basename(src)) + logger.debug(f"Copying {src} to {dest}") + yaml_copy = { + 'copy': [[src, dest]] + } + FileHandler(yaml_copy).sync() # create template dictionaries template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL diff --git a/ush/python/pygfs/task/stage_ic.py b/ush/python/pygfs/task/stage_ic.py index d4d9dc3e19..37cc4138f3 100644 --- a/ush/python/pygfs/task/stage_ic.py +++ b/ush/python/pygfs/task/stage_ic.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import glob import os from logging import getLogger from typing import Any, Dict, List @@ -52,6 +53,9 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None: # Add the os.path.exists function to the dict for yaml parsing stage_dict['path_exists'] = os.path.exists + # Add the glob.glob function for capturing filenames + stage_dict['glob'] = glob.glob + # Parse stage yaml to get list of files to copy stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, allow_missing=False) diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 8c1f69735e..d6d7453c3c 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -172,7 +172,7 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]: files += ['config.fcst', 'config.efcs'] elif config in ['atmanlinit', 'atmanlvar', 'atmanlfv3inc']: files += ['config.atmanl', f'config.{config}'] - elif config in ['atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc']: + elif config in ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc']: files += ['config.atmensanl', f'config.{config}'] elif 'wave' in config: files += ['config.wave', f'config.{config}'] diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index f534764245..8c3bca0fd7 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -57,7 +57,7 @@ def _get_app_configs(self): if self.do_hybvar: if self.do_jediatmens: - configs += ['atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] + configs += ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] else: configs += ['eobs', 'eomg', 'ediag', 'eupd'] configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] @@ -165,7 +165,8 @@ def get_task_names(self): hybrid_after_eupd_tasks = [] if self.do_hybvar: if self.do_jediatmens: - hybrid_tasks += ['atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'echgres'] + hybrid_tasks += ['atmensanlinit', 'atmensanlfv3inc', 'atmensanlfinal', 'echgres'] + hybrid_tasks += ['atmensanlobs', 'atmensanlsol'] if self.lobsdiag_forenkf else ['atmensanlletkf'] else: hybrid_tasks += ['eobs', 'eupd', 'echgres'] hybrid_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg'] diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index f60ac9a549..23f334549a 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -2454,6 +2454,58 @@ def atmensanlinit(self): return task + def atmensanlobs(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlinit'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'enkfgdasepmn', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('atmensanlobs') + task_name = f'{self.run}atmensanlobs' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': self.envars, + 'cycledef': self.run.replace('enkf', ''), + 'command': f'{self.HOMEgfs}/jobs/rocoto/atmensanlobs.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + + def atmensanlsol(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlobs'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'enkfgdasepmn', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('atmensanlsol') + task_name = f'{self.run}atmensanlsol' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': self.envars, + 'cycledef': self.run.replace('enkf', ''), + 'command': f'{self.HOMEgfs}/jobs/rocoto/atmensanlsol.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + def atmensanlletkf(self): deps = [] @@ -2483,7 +2535,10 @@ def atmensanlletkf(self): def atmensanlfv3inc(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlletkf'} + if self.app_config.lobsdiag_forenkf: + dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlsol'} + else: + dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlletkf'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'enkfgdasepmn', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index ab89247fbb..d943cd130c 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -19,7 +19,7 @@ class Tasks: 'ocnanalprep', 'marinebmat', 'ocnanalrun', 'ocnanalecen', 'ocnanalchkpt', 'ocnanalpost', 'ocnanalvrfy', 'earc', 'ecen', 'echgres', 'ediag', 'efcs', 'eobs', 'eomg', 'epos', 'esfc', 'eupd', - 'atmensanlinit', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', + 'atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'aeroanlinit', 'aeroanlrun', 'aeroanlfinal', 'prepsnowobs', 'snowanl', 'esnowrecen', 'fcst',