Skip to content

Commit

Permalink
Add MPI test (UW-Hydro#567)
Browse files Browse the repository at this point in the history
* Add mpi multiprocessing test - test whether the flux and state results
using different number of processors are exactly identical for image
driver

* Add instruction on running VIC image driver using multiple processors to
the documentation

* Remove state output in mpi test - for debugging purpose

* Add back outputing state file in mpi test

* Add debug lines in VIC source code to debug failing MPI when writing
state file

* fixed bug of missing statesec in mpi global parameter file

* Remove some debugging lines in the VIC code for detecting MPI bug (bug
fixed)
  • Loading branch information
yixinmao authored and bartnijssen committed Jul 28, 2016
1 parent eea8118 commit 5310516
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/Documentation/Drivers/Image/RunVIC.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ At the command prompt, type:

where `global_parameter_filename` = name of the global parameter file corresponding to your project.

To run VIC image driver using multiple processor, type the following instead:

`mpiexec -np n_proc vic_image.exe -g global_parameter_filename`

where `n_proc` = number of processors to be used

## Other Command Line Options

VIC has a few other command line options:
Expand Down
61 changes: 56 additions & 5 deletions tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
find_global_param_value,
check_multistream_classic)
from test_image_driver import (test_image_driver_no_output_file_nans,
check_multistream_image)
check_multistream_image,
setup_subdirs_and_fill_in_global_param_mpi_test,
check_mpi_fluxes, check_mpi_states)
from test_restart import (prepare_restart_run_periods,
setup_subdirs_and_fill_in_global_param_restart_test,
check_exact_restart_fluxes,
Expand Down Expand Up @@ -338,6 +340,13 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
run_periods = prepare_restart_run_periods(
test_dict['restart'],
dirs['state'], statesec)
# If mpi test, prepare a list of number of processors to be run
elif 'mpi' in test_dict['check']:
if not isinstance(test_dict['mpi']['n_proc'], list):
print('Error: need at least two values in n_proc to run'
'mpi test!')
raise
list_n_proc = test_dict['mpi']['n_proc']

# create template string
s = string.Template(global_param)
Expand All @@ -351,7 +360,15 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
setup_subdirs_and_fill_in_global_param_restart_test(
s, run_periods, driver, dirs['results'], dirs['state'],
test_data_dir)
# else, single run
# --- if mpi test, multiple runs --- #
elif 'mpi' in test_dict['check']:
# Set up subdirectories and output directories in global file for
# multiprocessor testing
list_global_param = \
setup_subdirs_and_fill_in_global_param_mpi_test(
s, list_n_proc, dirs['results'], dirs['state'],
test_data_dir)
# --- else, single run --- #
else:
global_param = s.safe_substitute(test_data_dir=test_data_dir,
result_dir=dirs['results'],
Expand All @@ -368,14 +385,15 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
if 'STATE_FORMAT' in replacements:
state_format = replacements['STATE_FORMAT']
# --- replace global options --- #
if 'exact_restart' in test_dict['check']:
if 'exact_restart' in test_dict['check'] or\
'mpi' in test_dict['check']: # if multiple runs
for j, gp in enumerate(list_global_param):
# save a copy of replacements for the next global file
replacements_cp = replacements.copy()
# replace global options for this global file
list_global_param[j] = replace_global_values(gp, replacements)
replacements = replacements_cp
else:
else: # if single run
global_param = replace_global_values(global_param, replacements)

# write global parameter file
Expand All @@ -392,6 +410,17 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
with open(test_global_file, mode='w') as f:
for line in gp:
f.write(line)
elif 'mpi' in test_dict['check']:
list_test_global_file = []
for j, gp in enumerate(list_global_param):
test_global_file = os.path.join(
dirs['test'],
'{}_globalparam_processors_{}.txt'.format(
testname, list_n_proc[j]))
list_test_global_file.append(test_global_file)
with open(test_global_file, mode='w') as f:
for line in gp:
f.write(line)
else:
test_global_file = os.path.join(
dirs['test'],
Expand All @@ -413,7 +442,23 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
if 'exact_restart' in test_dict['check']:
for j, test_global_file in enumerate(list_test_global_file):
returncode = vic_exe.run(test_global_file,
logdir=dirs['logs'])
logdir=dirs['logs'],
**run_kwargs)
# Check return code
check_returncode(vic_exe,
test_dict.pop('expected_retval', 0))
if 'mpi' in test_dict['check']:
for j, test_global_file in enumerate(list_test_global_file):
# Overwrite mpi_proc in option kwargs
n_proc = list_n_proc[j]
if n_proc == 1:
run_kwargs['mpi_proc'] = None
else:
run_kwargs['mpi_proc'] = list_n_proc[j]
# Run VIC
returncode = vic_exe.run(test_global_file,
logdir=dirs['logs'],
**run_kwargs)
# Check return code
check_returncode(vic_exe,
test_dict.pop('expected_retval', 0))
Expand Down Expand Up @@ -460,6 +505,7 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
check_exact_restart_states(dirs['state'], driver,
run_periods, statesec)

# check for multistream output
if 'multistream' in test_dict['check']:
fnames = glob.glob(os.path.join(dirs['results'], '*'))
if driver == 'classic':
Expand All @@ -468,6 +514,11 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver):
warnings.warn('Skipping multistream image driver test')
# TODO: check_multistream_image(fnames)

# check for mpi multiprocessor results
if 'mpi' in test_dict['check']:
check_mpi_fluxes(dirs['results'], list_n_proc)
check_mpi_states(dirs['state'], list_n_proc)

# if we got this far, the test passed.
test_passed = True

Expand Down
71 changes: 71 additions & 0 deletions tests/system/global.image.STEHE.mpi.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
NODES 3
MODEL_STEPS_PER_DAY 24
SNOW_STEPS_PER_DAY 24
RUNOFF_STEPS_PER_DAY 24
STARTYEAR 1949
STARTMONTH 1
STARTDAY 1
ENDYEAR 1949
ENDMONTH 1
ENDDAY 10
CALENDAR PROLEPTIC_GREGORIAN
FULL_ENERGY FALSE
FROZEN_SOIL FALSE

DOMAIN $test_data_dir/image/Stehekin/parameters/domain.stehekin.20151028.nc
DOMAIN_TYPE LAT lat
DOMAIN_TYPE LON lon
DOMAIN_TYPE MASK mask
DOMAIN_TYPE AREA area
DOMAIN_TYPE FRAC frac
DOMAIN_TYPE YDIM lat
DOMAIN_TYPE XDIM lon

#INIT_STATE
STATENAME $state_dir/states
STATEYEAR 1949
STATEMONTH 1
STATEDAY 10
STATESEC 82800

FORCING1 $test_data_dir/image/Stehekin/forcings/Stehekin_image_test.forcings_10days.
FORCE_TYPE AIR_TEMP tas
FORCE_TYPE PREC prcp
FORCE_TYPE PRESSURE pres
FORCE_TYPE SWDOWN dswrf
FORCE_TYPE LWDOWN dlwrf
FORCE_TYPE VP shum
FORCE_TYPE WIND wind
WIND_H 10.0

PARAMETERS $test_data_dir/image/Stehekin/parameters/Stehekin_test_params_20160327.nc
BASEFLOW ARNO
JULY_TAVG_SUPPLIED FALSE
ORGANIC_FRACT FALSE
LAI_SRC FROM_VEGPARAM
SNOW_BAND TRUE

RESULT_DIR $result_dir

OUTFILE fluxes
AGGFREQ NHOURS 1
OUTVAR OUT_PREC
OUTVAR OUT_RAINF
OUTVAR OUT_SNOWF
OUTVAR OUT_AIR_TEMP
OUTVAR OUT_SWDOWN
OUTVAR OUT_LWDOWN
OUTVAR OUT_PRESSURE
OUTVAR OUT_WIND
OUTVAR OUT_DENSITY
OUTVAR OUT_REL_HUMID
OUTVAR OUT_QAIR
OUTVAR OUT_VP
OUTVAR OUT_VPD
OUTVAR OUT_RUNOFF
OUTVAR OUT_BASEFLOW
OUTVAR OUT_EVAP
OUTVAR OUT_SWE
OUTVAR OUT_SOIL_MOIST
OUTVAR OUT_ALBEDO
OUTVAR OUT_SOIL_TEMP
11 changes: 11 additions & 0 deletions tests/system/system_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,14 @@ driver = image
global_parameter_file = global.image.STEHE.allhistvars.txt
expected_retval = 0
check = nonans

[System-mpi_image_check_identical_results]
test_description = check that multi-processor runs produce identical results - image driver
driver = image
global_parameter_file = global.image.STEHE.mpi.txt
expected_retval = 0
check = mpi
[[mpi]]
# A list of number of processors to run and compare (need at least a list of two numbers)
n_proc = 1,4

161 changes: 161 additions & 0 deletions tests/test_image_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
import numpy as np
import numpy.testing as npt
import glob


def test_image_driver_no_output_file_nans(fnames, domain_file):
Expand Down Expand Up @@ -133,3 +134,163 @@ def reindex_xr_obj_timedim(obj, freq):
print('actual=%s\nexpected=%s' % (actual, expected))
print(np.abs(actual-expected).max())
raise e


def setup_subdirs_and_fill_in_global_param_mpi_test(
s, list_n_proc, result_basedir, state_basedir, test_data_dir):
''' Fill in global parameter output directories for multiple runs for mpi
testing, image driver
Parameters
----------
s: <string.Template>
Template of the global param file to be filled in
list_n_proc: <list>
A list of number of processors to run and compare
result_basedir: <str>
Base directory of output fluxes results; runs with different number of
processors are output to subdirectories under the base directory
state_basedir: <str>
Base directory of output state results; runs with different number of
processors are output to subdirectories under the base directory
test_data_dir: <str>
Base directory of test data
Returns
----------
list_global_param: <list>
A list of global parameter strings to be run with parameters filled in
Require
----------
os
'''

list_global_param = []
for j, n_proc in enumerate(list_n_proc):
# Set up subdirectories for results and states
result_dir = os.path.join(
result_basedir,
'processors_{}'.format(n_proc))
state_dir = os.path.join(
state_basedir,
'processors_{}'.format(n_proc))
os.makedirs(result_dir, exist_ok=True)
os.makedirs(state_dir, exist_ok=True)

# Fill in global parameter options
list_global_param.append(s.safe_substitute(
test_data_dir=test_data_dir,
result_dir=result_dir,
state_dir=state_dir))

return(list_global_param)


def check_mpi_fluxes(result_basedir, list_n_proc):
''' Check whether all the fluxes are the same with different number of
processors, image driver
Parameters
----------
result_basedir: <str>
Base directory of output fluxes results; runs with different number of
processors are output to subdirectories under the base directory
list_n_proc: <list>
A list of number of processors to run and compare
Require
----------
os
glob
numpy
'''

# Read the first run - as base
n_proc = list_n_proc[0]
result_dir = os.path.join(
result_basedir,
'processors_{}'.format(n_proc))
if len(glob.glob(os.path.join(result_dir, '*.nc'))) > 1:
print(
'Warning: more than one netCDF file found under directory {}'.
format(result_dir))
fname = glob.glob(os.path.join(result_dir, '*.nc'))[0]
ds_first_run = xr.open_dataset(fname)

# Loop over all rest runs and compare fluxes with the base run
for i, n_proc in enumerate(list_n_proc):
# Skip the first run
if i == 0:
continue
# Read flux results for this run
result_dir = os.path.join(
result_basedir,
'processors_{}'.format(n_proc))
if len(glob.glob(os.path.join(result_dir, '*.nc'))) > 1:
print(
'Warning: more than one netCDF file found under directory {}'.
format(result_dir))
fname = glob.glob(os.path.join(result_dir, '*.nc'))[0]
ds_current_run = xr.open_dataset(fname)
# Compare current run with base run
for var in ds_first_run.data_vars:
npt.assert_array_equal(
ds_current_run[var].values,
ds_first_run[var].values,
err_msg='Fluxes are not an exact match')


def check_mpi_states(state_basedir, list_n_proc):
''' Check whether all the output states are the same with different number
of processors, image driver
Parameters
----------
state_basedir: <str>
Base directory of output states; runs with different number of
processors are output to subdirectories under the base directory
list_n_proc: <list>
A list of number of processors to run and compare
Require
----------
os
glob
numpy
'''

# Read the first run - as base
n_proc = list_n_proc[0]
state_dir = os.path.join(
state_basedir,
'processors_{}'.format(n_proc))
if len(glob.glob(os.path.join(state_dir, '*.nc'))) > 1:
print(
'Warning: more than one netCDF file found under directory {}'.
format(state_dir))
fname = glob.glob(os.path.join(state_dir, '*.nc'))[0]
ds_first_run = xr.open_dataset(fname)

# Loop over all rest runs and compare fluxes with the base run
for i, n_proc in enumerate(list_n_proc):
# Skip the first run
if i == 0:
continue
# Read output states for this run
state_dir = os.path.join(
state_basedir,
'processors_{}'.format(n_proc))
if len(glob.glob(os.path.join(state_dir, '*.nc'))) > 1:
print(
'Warning: more than one netCDF file found under directory {}'.
format(result_dir))
fname = glob.glob(os.path.join(state_dir, '*.nc'))[0]
ds_current_run = xr.open_dataset(fname)
# Compare current run with base run
for var in ds_first_run.data_vars:
npt.assert_array_equal(
ds_current_run[var].values,
ds_first_run[var].values,
err_msg='States are not an exact match')

Loading

0 comments on commit 5310516

Please sign in to comment.