diff --git a/docs/Documentation/Drivers/Image/RunVIC.md b/docs/Documentation/Drivers/Image/RunVIC.md index 05b2b9b44..65aa139ec 100644 --- a/docs/Documentation/Drivers/Image/RunVIC.md +++ b/docs/Documentation/Drivers/Image/RunVIC.md @@ -41,6 +41,12 @@ At the command prompt, type: where `global_parameter_filename` = name of the global parameter file corresponding to your project. +To run VIC image driver using multiple processor, type the following instead: + +`mpiexec -np n_proc vic_image.exe -g global_parameter_filename` + +where `n_proc` = number of processors to be used + ## Other Command Line Options VIC has a few other command line options: diff --git a/tests/run_tests.py b/tests/run_tests.py index cdda2d4aa..6bcc99d27 100755 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -24,7 +24,9 @@ find_global_param_value, check_multistream_classic) from test_image_driver import (test_image_driver_no_output_file_nans, - check_multistream_image) + check_multistream_image, + setup_subdirs_and_fill_in_global_param_mpi_test, + check_mpi_fluxes, check_mpi_states) from test_restart import (prepare_restart_run_periods, setup_subdirs_and_fill_in_global_param_restart_test, check_exact_restart_fluxes, @@ -338,6 +340,13 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): run_periods = prepare_restart_run_periods( test_dict['restart'], dirs['state'], statesec) + # If mpi test, prepare a list of number of processors to be run + elif 'mpi' in test_dict['check']: + if not isinstance(test_dict['mpi']['n_proc'], list): + print('Error: need at least two values in n_proc to run' + 'mpi test!') + raise + list_n_proc = test_dict['mpi']['n_proc'] # create template string s = string.Template(global_param) @@ -351,7 +360,15 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): setup_subdirs_and_fill_in_global_param_restart_test( s, run_periods, driver, dirs['results'], dirs['state'], test_data_dir) - # else, single run + # --- if mpi test, multiple runs --- # + elif 'mpi' in test_dict['check']: + # Set up subdirectories and output directories in global file for + # multiprocessor testing + list_global_param = \ + setup_subdirs_and_fill_in_global_param_mpi_test( + s, list_n_proc, dirs['results'], dirs['state'], + test_data_dir) + # --- else, single run --- # else: global_param = s.safe_substitute(test_data_dir=test_data_dir, result_dir=dirs['results'], @@ -368,14 +385,15 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): if 'STATE_FORMAT' in replacements: state_format = replacements['STATE_FORMAT'] # --- replace global options --- # - if 'exact_restart' in test_dict['check']: + if 'exact_restart' in test_dict['check'] or\ + 'mpi' in test_dict['check']: # if multiple runs for j, gp in enumerate(list_global_param): # save a copy of replacements for the next global file replacements_cp = replacements.copy() # replace global options for this global file list_global_param[j] = replace_global_values(gp, replacements) replacements = replacements_cp - else: + else: # if single run global_param = replace_global_values(global_param, replacements) # write global parameter file @@ -392,6 +410,17 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): with open(test_global_file, mode='w') as f: for line in gp: f.write(line) + elif 'mpi' in test_dict['check']: + list_test_global_file = [] + for j, gp in enumerate(list_global_param): + test_global_file = os.path.join( + dirs['test'], + '{}_globalparam_processors_{}.txt'.format( + testname, list_n_proc[j])) + list_test_global_file.append(test_global_file) + with open(test_global_file, mode='w') as f: + for line in gp: + f.write(line) else: test_global_file = os.path.join( dirs['test'], @@ -413,7 +442,23 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): if 'exact_restart' in test_dict['check']: for j, test_global_file in enumerate(list_test_global_file): returncode = vic_exe.run(test_global_file, - logdir=dirs['logs']) + logdir=dirs['logs'], + **run_kwargs) + # Check return code + check_returncode(vic_exe, + test_dict.pop('expected_retval', 0)) + if 'mpi' in test_dict['check']: + for j, test_global_file in enumerate(list_test_global_file): + # Overwrite mpi_proc in option kwargs + n_proc = list_n_proc[j] + if n_proc == 1: + run_kwargs['mpi_proc'] = None + else: + run_kwargs['mpi_proc'] = list_n_proc[j] + # Run VIC + returncode = vic_exe.run(test_global_file, + logdir=dirs['logs'], + **run_kwargs) # Check return code check_returncode(vic_exe, test_dict.pop('expected_retval', 0)) @@ -460,6 +505,7 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): check_exact_restart_states(dirs['state'], driver, run_periods, statesec) + # check for multistream output if 'multistream' in test_dict['check']: fnames = glob.glob(os.path.join(dirs['results'], '*')) if driver == 'classic': @@ -468,6 +514,11 @@ def run_system(config_file, vic_exe, test_data_dir, out_dir, driver): warnings.warn('Skipping multistream image driver test') # TODO: check_multistream_image(fnames) + # check for mpi multiprocessor results + if 'mpi' in test_dict['check']: + check_mpi_fluxes(dirs['results'], list_n_proc) + check_mpi_states(dirs['state'], list_n_proc) + # if we got this far, the test passed. test_passed = True diff --git a/tests/system/global.image.STEHE.mpi.txt b/tests/system/global.image.STEHE.mpi.txt new file mode 100644 index 000000000..f247f6d16 --- /dev/null +++ b/tests/system/global.image.STEHE.mpi.txt @@ -0,0 +1,71 @@ +NODES 3 +MODEL_STEPS_PER_DAY 24 +SNOW_STEPS_PER_DAY 24 +RUNOFF_STEPS_PER_DAY 24 +STARTYEAR 1949 +STARTMONTH 1 +STARTDAY 1 +ENDYEAR 1949 +ENDMONTH 1 +ENDDAY 10 +CALENDAR PROLEPTIC_GREGORIAN +FULL_ENERGY FALSE +FROZEN_SOIL FALSE + +DOMAIN $test_data_dir/image/Stehekin/parameters/domain.stehekin.20151028.nc +DOMAIN_TYPE LAT lat +DOMAIN_TYPE LON lon +DOMAIN_TYPE MASK mask +DOMAIN_TYPE AREA area +DOMAIN_TYPE FRAC frac +DOMAIN_TYPE YDIM lat +DOMAIN_TYPE XDIM lon + +#INIT_STATE +STATENAME $state_dir/states +STATEYEAR 1949 +STATEMONTH 1 +STATEDAY 10 +STATESEC 82800 + +FORCING1 $test_data_dir/image/Stehekin/forcings/Stehekin_image_test.forcings_10days. +FORCE_TYPE AIR_TEMP tas +FORCE_TYPE PREC prcp +FORCE_TYPE PRESSURE pres +FORCE_TYPE SWDOWN dswrf +FORCE_TYPE LWDOWN dlwrf +FORCE_TYPE VP shum +FORCE_TYPE WIND wind +WIND_H 10.0 + +PARAMETERS $test_data_dir/image/Stehekin/parameters/Stehekin_test_params_20160327.nc +BASEFLOW ARNO +JULY_TAVG_SUPPLIED FALSE +ORGANIC_FRACT FALSE +LAI_SRC FROM_VEGPARAM +SNOW_BAND TRUE + +RESULT_DIR $result_dir + +OUTFILE fluxes +AGGFREQ NHOURS 1 +OUTVAR OUT_PREC +OUTVAR OUT_RAINF +OUTVAR OUT_SNOWF +OUTVAR OUT_AIR_TEMP +OUTVAR OUT_SWDOWN +OUTVAR OUT_LWDOWN +OUTVAR OUT_PRESSURE +OUTVAR OUT_WIND +OUTVAR OUT_DENSITY +OUTVAR OUT_REL_HUMID +OUTVAR OUT_QAIR +OUTVAR OUT_VP +OUTVAR OUT_VPD +OUTVAR OUT_RUNOFF +OUTVAR OUT_BASEFLOW +OUTVAR OUT_EVAP +OUTVAR OUT_SWE +OUTVAR OUT_SOIL_MOIST +OUTVAR OUT_ALBEDO +OUTVAR OUT_SOIL_TEMP diff --git a/tests/system/system_tests.cfg b/tests/system/system_tests.cfg index 8c295aacc..5927c81cb 100644 --- a/tests/system/system_tests.cfg +++ b/tests/system/system_tests.cfg @@ -225,3 +225,14 @@ driver = image global_parameter_file = global.image.STEHE.allhistvars.txt expected_retval = 0 check = nonans + +[System-mpi_image_check_identical_results] +test_description = check that multi-processor runs produce identical results - image driver +driver = image +global_parameter_file = global.image.STEHE.mpi.txt +expected_retval = 0 +check = mpi +[[mpi]] +# A list of number of processors to run and compare (need at least a list of two numbers) +n_proc = 1,4 + diff --git a/tests/test_image_driver.py b/tests/test_image_driver.py index ef0f7380c..13b04c6a6 100644 --- a/tests/test_image_driver.py +++ b/tests/test_image_driver.py @@ -7,6 +7,7 @@ import pandas as pd import numpy as np import numpy.testing as npt +import glob def test_image_driver_no_output_file_nans(fnames, domain_file): @@ -133,3 +134,163 @@ def reindex_xr_obj_timedim(obj, freq): print('actual=%s\nexpected=%s' % (actual, expected)) print(np.abs(actual-expected).max()) raise e + + +def setup_subdirs_and_fill_in_global_param_mpi_test( + s, list_n_proc, result_basedir, state_basedir, test_data_dir): + ''' Fill in global parameter output directories for multiple runs for mpi + testing, image driver + + Parameters + ---------- + s: + Template of the global param file to be filled in + list_n_proc: + A list of number of processors to run and compare + result_basedir: + Base directory of output fluxes results; runs with different number of + processors are output to subdirectories under the base directory + state_basedir: + Base directory of output state results; runs with different number of + processors are output to subdirectories under the base directory + test_data_dir: + Base directory of test data + + Returns + ---------- + list_global_param: + A list of global parameter strings to be run with parameters filled in + + Require + ---------- + os + ''' + + list_global_param = [] + for j, n_proc in enumerate(list_n_proc): + # Set up subdirectories for results and states + result_dir = os.path.join( + result_basedir, + 'processors_{}'.format(n_proc)) + state_dir = os.path.join( + state_basedir, + 'processors_{}'.format(n_proc)) + os.makedirs(result_dir, exist_ok=True) + os.makedirs(state_dir, exist_ok=True) + + # Fill in global parameter options + list_global_param.append(s.safe_substitute( + test_data_dir=test_data_dir, + result_dir=result_dir, + state_dir=state_dir)) + + return(list_global_param) + + +def check_mpi_fluxes(result_basedir, list_n_proc): + ''' Check whether all the fluxes are the same with different number of + processors, image driver + + Parameters + ---------- + result_basedir: + Base directory of output fluxes results; runs with different number of + processors are output to subdirectories under the base directory + list_n_proc: + A list of number of processors to run and compare + + Require + ---------- + os + glob + numpy + ''' + + # Read the first run - as base + n_proc = list_n_proc[0] + result_dir = os.path.join( + result_basedir, + 'processors_{}'.format(n_proc)) + if len(glob.glob(os.path.join(result_dir, '*.nc'))) > 1: + print( + 'Warning: more than one netCDF file found under directory {}'. + format(result_dir)) + fname = glob.glob(os.path.join(result_dir, '*.nc'))[0] + ds_first_run = xr.open_dataset(fname) + + # Loop over all rest runs and compare fluxes with the base run + for i, n_proc in enumerate(list_n_proc): + # Skip the first run + if i == 0: + continue + # Read flux results for this run + result_dir = os.path.join( + result_basedir, + 'processors_{}'.format(n_proc)) + if len(glob.glob(os.path.join(result_dir, '*.nc'))) > 1: + print( + 'Warning: more than one netCDF file found under directory {}'. + format(result_dir)) + fname = glob.glob(os.path.join(result_dir, '*.nc'))[0] + ds_current_run = xr.open_dataset(fname) + # Compare current run with base run + for var in ds_first_run.data_vars: + npt.assert_array_equal( + ds_current_run[var].values, + ds_first_run[var].values, + err_msg='Fluxes are not an exact match') + + +def check_mpi_states(state_basedir, list_n_proc): + ''' Check whether all the output states are the same with different number + of processors, image driver + + Parameters + ---------- + state_basedir: + Base directory of output states; runs with different number of + processors are output to subdirectories under the base directory + list_n_proc: + A list of number of processors to run and compare + + Require + ---------- + os + glob + numpy + ''' + + # Read the first run - as base + n_proc = list_n_proc[0] + state_dir = os.path.join( + state_basedir, + 'processors_{}'.format(n_proc)) + if len(glob.glob(os.path.join(state_dir, '*.nc'))) > 1: + print( + 'Warning: more than one netCDF file found under directory {}'. + format(state_dir)) + fname = glob.glob(os.path.join(state_dir, '*.nc'))[0] + ds_first_run = xr.open_dataset(fname) + + # Loop over all rest runs and compare fluxes with the base run + for i, n_proc in enumerate(list_n_proc): + # Skip the first run + if i == 0: + continue + # Read output states for this run + state_dir = os.path.join( + state_basedir, + 'processors_{}'.format(n_proc)) + if len(glob.glob(os.path.join(state_dir, '*.nc'))) > 1: + print( + 'Warning: more than one netCDF file found under directory {}'. + format(result_dir)) + fname = glob.glob(os.path.join(state_dir, '*.nc'))[0] + ds_current_run = xr.open_dataset(fname) + # Compare current run with base run + for var in ds_first_run.data_vars: + npt.assert_array_equal( + ds_current_run[var].values, + ds_first_run[var].values, + err_msg='States are not an exact match') + diff --git a/vic/drivers/image/src/vic_image.c b/vic/drivers/image/src/vic_image.c index 75be0ec73..163839095 100644 --- a/vic/drivers/image/src/vic_image.c +++ b/vic/drivers/image/src/vic_image.c @@ -119,6 +119,11 @@ main(int argc, // initialize output structures vic_init_output(&(dmy[0])); + // Initialization is complete, print settings + log_info("Initialization is complete, print global param and options structures"); + print_global_param(&global_param); + print_option(&options); + // stop init timer timer_stop(&(global_timers[TIMER_VIC_INIT])); // start vic run timer @@ -137,6 +142,7 @@ main(int argc, // Write state file if (check_save_state_flag(current)) { + debug("writing state file for timestep %zu", current); vic_store(&(dmy[current]), state_filename); debug("finished storing state file: %s", state_filename) } @@ -160,8 +166,11 @@ main(int argc, timer_stop(&(global_timers[TIMER_VIC_FINAL])); // stop vic all timer timer_stop(&(global_timers[TIMER_VIC_ALL])); - // write timing info - write_vic_timing_table(global_timers, VIC_DRIVER); + + if (mpi_rank == VIC_MPI_ROOT) { + // write timing info + write_vic_timing_table(global_timers, VIC_DRIVER); + } return EXIT_SUCCESS; } diff --git a/vic/drivers/shared_all/src/print_library_shared.c b/vic/drivers/shared_all/src/print_library_shared.c index b725365f8..a7dd131cf 100644 --- a/vic/drivers/shared_all/src/print_library_shared.c +++ b/vic/drivers/shared_all/src/print_library_shared.c @@ -282,6 +282,7 @@ print_global_param(global_param_struct *gp) fprintf(LOG_DEST, "\tstateday : %hu\n", gp->stateday); fprintf(LOG_DEST, "\tstatemonth : %hu\n", gp->statemonth); fprintf(LOG_DEST, "\tstateyear : %hu\n", gp->stateyear); + fprintf(LOG_DEST, "\tstatesec : %u\n", gp->statesec); } /****************************************************************************** diff --git a/vic/drivers/shared_image/src/vic_mpi_support.c b/vic/drivers/shared_image/src/vic_mpi_support.c index 4636975f1..0f73a5e84 100644 --- a/vic/drivers/shared_image/src/vic_mpi_support.c +++ b/vic/drivers/shared_image/src/vic_mpi_support.c @@ -103,7 +103,7 @@ create_MPI_global_struct_type(MPI_Datatype *mpi_type) MPI_Datatype *mpi_types; // nitems has to equal the number of elements in global_param_struct - nitems = 31; + nitems = 32; blocklengths = malloc(nitems * sizeof(*blocklengths)); check_alloc_status(blocklengths, "Memory allocation error."); @@ -232,6 +232,10 @@ create_MPI_global_struct_type(MPI_Datatype *mpi_type) offsets[i] = offsetof(global_param_struct, statemonth); mpi_types[i++] = MPI_UNSIGNED_SHORT; + // unsigned int statesec; + offsets[i] = offsetof(global_param_struct, statesec); + mpi_types[i++] = MPI_UNSIGNED; + // unsigned short int stateyear; offsets[i] = offsetof(global_param_struct, stateyear); mpi_types[i++] = MPI_UNSIGNED_SHORT;