diff --git a/eessi/testsuite/eessi_mixin.py b/eessi/testsuite/eessi_mixin.py index 692fbbca..3dd5b224 100644 --- a/eessi/testsuite/eessi_mixin.py +++ b/eessi/testsuite/eessi_mixin.py @@ -1,4 +1,4 @@ -from reframe.core.builtins import parameter, run_after, variable +from reframe.core.builtins import parameter, run_after, run_before, variable from reframe.core.exceptions import ReframeFatalError from reframe.core.pipeline import RegressionMixin from reframe.utility.sanity import make_performance_function @@ -44,6 +44,7 @@ class EESSI_Mixin(RegressionMixin): scale = parameter(SCALES.keys()) bench_name = None bench_name_ci = None + num_tasks_per_compute_unit = 1 # Create ReFrame variables for logging runtime environment information cvmfs_repo_name = variable(str, value='None') @@ -119,7 +120,7 @@ def run_after_init(self): # Set scales as tags hooks.set_tag_scale(self) - @run_after('init') + @run_before('setup', always_last=True) def measure_mem_usage(self): if self.measure_memory_usage: hooks.measure_memory_usage(self) @@ -164,7 +165,8 @@ def validate_setup(self): @run_after('setup') def assign_tasks_per_compute_unit(self): """Call hooks to assign tasks per compute unit, set OMP_NUM_THREADS, and set compact process binding""" - hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit) + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit, + num_per=self.num_tasks_per_compute_unit) # Set OMP_NUM_THREADS environment variable hooks.set_omp_num_threads(self) diff --git a/eessi/testsuite/tests/apps/osu.py b/eessi/testsuite/tests/apps/osu.py index 83bbd0f4..3ea452f1 100644 --- a/eessi/testsuite/tests/apps/osu.py +++ b/eessi/testsuite/tests/apps/osu.py @@ -6,19 +6,20 @@ non-GPU nodes. Otherwise those tests will FAIL. """ import reframe as rfm -from reframe.core.builtins import parameter, run_after # added only to make the linter happy +from reframe.core.builtins import parameter, run_after from reframe.utility import reframe from hpctestlib.microbenchmarks.mpi.osu import osu_benchmark -from eessi.testsuite import hooks, utils -from eessi.testsuite.constants import * +from eessi.testsuite.constants import COMPUTE_UNIT, CPU, DEVICE_TYPES, INVALID_SYSTEM, GPU, NODE, SCALES +from eessi.testsuite.eessi_mixin import EESSI_Mixin from eessi.testsuite.utils import find_modules, log -def filter_scales_pt2pt(): +def filter_scales_pt2pt_cpu(): """ - Filtering function for filtering scales for the pt2pt OSU test + Filtering function for filtering scales for the pt2pt OSU test on CPUs + returns all scales with either 2 cores, 1 full node, or 2 full nodes """ return [ k for (k, v) in SCALES.items() @@ -28,9 +29,23 @@ def filter_scales_pt2pt(): ] +def filter_scales_pt2pt_gpu(): + """ + Filtering function for filtering scales for the pt2pt OSU test on GPUs + returns all scales with either a partial node, 1 full node, or 2 full nodes + """ + return [ + k for (k, v) in SCALES.items() + if (v['num_nodes'] == 1 and v.get('node_part', 0) > 1) + or (v['num_nodes'] == 2 and v.get('node_part', 0) == 1) + or (v['num_nodes'] == 1 and v.get('node_part', 0) == 1) + ] + + def filter_scales_coll(): """ - Filtering function for filtering scales for collective the OSU test + Filtering function for filtering scales for the collective OSU test + returns all scales with at least 2 cores """ return [ k for (k, v) in SCALES.items() @@ -39,18 +54,12 @@ def filter_scales_coll(): ] -@rfm.simple_test -class EESSI_OSU_Micro_Benchmarks_pt2pt(osu_benchmark): - ''' Run-only OSU test ''' - scale = parameter(filter_scales_pt2pt()) - valid_prog_environs = ['default'] - valid_systems = ['*'] +class EESSI_OSU_Base(osu_benchmark): + """ base class for OSU tests """ time_limit = '30m' module_name = parameter(find_modules('OSU-Micro-Benchmarks')) - # Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default - # device type is set to GPU. - device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]]) - # unset num_tasks_per_node from the hpctestlib. + + # reset num_tasks_per_node from the hpctestlib: we handle it ourselves num_tasks_per_node = None # Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects @@ -58,6 +67,9 @@ class EESSI_OSU_Micro_Benchmarks_pt2pt(osu_benchmark): # Set num_iters to 10 to reduce execution time, especially on slower interconnects num_iters = 10 + def required_mem_per_node(self): + return self.num_tasks_per_node * 1024 + @run_after('init') def filter_scales_2gpus(self): """Filter out scales with < 2 GPUs if running on GPUs""" @@ -69,26 +81,6 @@ def filter_scales_2gpus(self): self.valid_systems = [INVALID_SYSTEM] log(f'valid_systems set to {self.valid_systems} for scale {self.scale} and device_type {self.device_type}') - @run_after('init') - def filter_benchmark_pt2pt(self): - """ Filter out all non-mpi.pt2pt benchmarks """ - if not self.benchmark_info[0].startswith('mpi.pt2pt'): - self.valid_systems = [INVALID_SYSTEM] - - @run_after('init') - def run_after_init(self): - """hooks to run after init phase""" - - # Filter on which scales are supported by the partitions defined in the ReFrame configuration - hooks.filter_supported_scales(self) - - hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type) - - hooks.set_modules(self) - - # Set scales as tags - hooks.set_tag_scale(self) - @run_after('init') def set_device_buffers(self): """ @@ -98,32 +90,38 @@ def set_device_buffers(self): """ if self.device_type == DEVICE_TYPES[GPU]: self.device_buffers = 'cuda' - else: - # If the device_type is CPU then device_buffers should always be CPU. self.device_buffers = 'cpu' @run_after('init') - def set_tag_ci(self): - """ Setting tests under CI tag. """ - if (self.benchmark_info[0] in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']): - self.tags.add('CI') - log(f'tags set to {self.tags}') + def set_tags(self): + """ Setting custom tags """ + self.bench_name = self.benchmark_info[0] + self.tags.add(self.bench_name.split('.')[-1]) + - if (self.benchmark_info[0] == 'mpi.pt2pt.osu_bw'): - self.tags.add('osu_bw') +class EESSI_OSU_pt2pt_Base(EESSI_OSU_Base): + ''' point-to-point OSU test base class ''' + compute_unit = COMPUTE_UNIT[NODE] - if (self.benchmark_info[0] == 'mpi.pt2pt.osu_latency'): - self.tags.add('osu_latency') + @run_after('init') + def filter_benchmark_pt2pt(self): + """ Filter out all non-mpi.pt2pt benchmarks """ + if not self.benchmark_info[0].startswith('mpi.pt2pt'): + self.valid_systems = [INVALID_SYSTEM] @run_after('init') - def set_mem(self): - """ Setting an extra job option of memory. This test has only 4 possibilities: 1_node, 2_nodes, 2_cores and - 1cpn_2nodes. This is implemented for all cases including full node cases. The requested memory may seem large - and the test requires at least 4.5 GB per core at the minimum for the full test when run with validation (-c - option for osu_bw or osu_latency). We run till message size 8 (-m 8) which significantly reduces memory - requirement.""" - self.extra_resources = {'memory': {'size': '12GB'}} + def select_ci(self): + " Select the CI variants " + if (self.bench_name in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']): + self.bench_name_ci = self.bench_name + + @run_after('init') + def set_num_tasks_per_compute_unit(self): + """ Setting number of tasks per compute unit and cpus per task. This sets num_cpus_per_task + for 1 node and 2 node options where the request is for full nodes.""" + if SCALES.get(self.scale).get('num_nodes') == 1: + self.num_tasks_per_compute_unit = 2 @run_after('setup') def adjust_executable_opts(self): @@ -132,134 +130,63 @@ def adjust_executable_opts(self): Therefore we must override it *after* the 'setup' phase """ if self.device_type == DEVICE_TYPES[CPU]: - self.executable_opts = [ele for ele in self.executable_opts if ele != 'D'] + self.executable_opts = [x for x in self.executable_opts if x != 'D'] - @run_after('setup') - def set_num_tasks_per_node(self): - """ Setting number of tasks per node and cpus per task in this function. This function sets num_cpus_per_task - for 1 node and 2 node options where the request is for full nodes.""" - if SCALES.get(self.scale).get('num_nodes') == 1: - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], 2) - else: - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE]) + +@rfm.simple_test +class EESSI_OSU_pt2pt_CPU(EESSI_OSU_pt2pt_Base, EESSI_Mixin): + ''' point-to-point OSU test on CPUs''' + scale = parameter(filter_scales_pt2pt_cpu()) + device_type = DEVICE_TYPES[CPU] + + +@rfm.simple_test +class EESSI_OSU_pt2pt_GPU(EESSI_OSU_pt2pt_Base, EESSI_Mixin): + ''' point-to-point OSU test on GPUs''' + scale = parameter(filter_scales_pt2pt_gpu()) + device_type = DEVICE_TYPES[GPU] @run_after('setup') - def set_num_gpus_per_node(self): - """ - Set number of GPUs per node for GPU-to-GPU tests - """ - if self.device_type == DEVICE_TYPES[GPU]: - # Skip single-node tests with less than 2 GPU devices in the node - self.skip_if( - SCALES[self.scale]['num_nodes'] == 1 and self.default_num_gpus_per_node < 2, - "There are < 2 GPU devices present in the node." - f" Skipping tests with device_type={DEVICE_TYPES[GPU]} involving < 2 GPUs and 1 node." - ) - if not self.num_gpus_per_node: - self.num_gpus_per_node = self.default_num_gpus_per_node - log(f'num_gpus_per_node set to {self.num_gpus_per_node} for partition {self.current_partition.name}') + def skip_test_1gpu(self): + num_gpus = self.num_gpus_per_node * self.num_nodes + self.skip_if( + num_gpus != 2 and self.scale not in ['1_node', '2_nodes'], + f"Skipping test : {num_gpus} GPU(s) available for this test case, need exactly 2" + ) @rfm.simple_test -class EESSI_OSU_Micro_Benchmarks_coll(osu_benchmark): - ''' Run-only OSU test ''' +class EESSI_OSU_coll(EESSI_OSU_Base, EESSI_Mixin): + ''' collective OSU test ''' scale = parameter(filter_scales_coll()) - valid_prog_environs = ['default'] - valid_systems = ['*'] - time_limit = '30m' - module_name = parameter(utils.find_modules('OSU-Micro-Benchmarks')) - # Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default - # device type is set to GPU. device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]]) - # Unset num_tasks_per_node from hpctestlib - num_tasks_per_node = None - - # Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects - num_warmup_iters = 5 - # Set num_iters to 10 to reduce execution time, especially on slower interconnects - num_iters = 10 @run_after('init') - def run_after_init(self): - """hooks to run after init phase""" - # Note: device_buffers variable is inherited from the hpctestlib class and adds options to the launcher - # commands based on what device is set. - self.device_buffers = 'cpu' - # Filter on which scales are supported by the partitions defined in the ReFrame configuration - hooks.filter_supported_scales(self) - hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type) - is_cuda_module = utils.is_cuda_required_module(self.module_name) - if is_cuda_module and self.device_type == DEVICE_TYPES[GPU]: - self.device_buffers = 'cuda' - - # If the device_type is CPU then device buffer should always be CPU. - if self.device_type == DEVICE_TYPES[CPU]: - self.device_buffers = 'cpu' - # This part of the code removes the collective communication calls out of the run list since this test is only - # meant for collective. + def filter_benchmark_coll(self): + """ Filter out all non-mpi.collective benchmarks """ if not self.benchmark_info[0].startswith('mpi.collective'): - self.valid_systems = [] - hooks.set_modules(self) - - @run_after('init') - def set_tag_ci(self): - if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce' - or self.benchmark_info[0] == 'mpi.collective.osu_alltoall'): - self.tags.add('CI') - if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce'): - self.tags.add('osu_allreduce') - if (self.benchmark_info[0] == 'mpi.collective.osu_alltoall'): - self.tags.add('osu_alltoall') + self.valid_systems = [INVALID_SYSTEM] @run_after('init') - def set_mem(self): - """ Setting an extra job option of memory. The alltoall operation takes maximum memory of 0.1 GB per core for a - message size of 8 and almost 0.5 GB per core for the maximum message size the test allows. But we limit the - message sizes to 8 and for a safety net we take 64 GB assuming dense nodes works for all the tests and node - types.""" - self.extra_resources = {'memory': {'size': '64GB'}} + def select_ci(self): + " Select the CI variants " + if (self.bench_name in ['mpi.collective.osu_allreduce', 'mpi.collective.osu_alltoall']): + self.bench_name_ci = self.bench_name @run_after('init') - def set_num_tasks(self): - hooks.set_tag_scale(self) + def set_compute_unit(self): + """ + Set the compute unit to which tasks will be assigned: + one task per core for CPU runs, and one task per GPU for GPU runs. + """ + device_to_compute_unit = { + DEVICE_TYPES[CPU]: COMPUTE_UNIT[CPU], + DEVICE_TYPES[GPU]: COMPUTE_UNIT[GPU], + } + self.compute_unit = device_to_compute_unit.get(self.device_type) @run_after('setup') - def set_num_tasks_per_node(self): - """ Setting number of tasks per node, cpus per task and gpus per node in this function. This function sets - num_cpus_per_task for 1 node and 2 node options where the request is for full nodes.""" - max_avail_cpus_per_node = self.current_partition.processor.num_cpus - if self.device_buffers == 'cpu': - # Setting num_tasks and num_tasks_per_node for the CPU tests - if SCALES.get(self.scale).get('num_cpus_per_node', 0): - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], - self.default_num_cpus_per_node) - elif SCALES.get(self.scale).get('node_part', 0): - pass_num_per = int(max_avail_cpus_per_node / SCALES.get(self.scale).get('node_part', 0)) - if pass_num_per > 1: - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], pass_num_per) - else: - self.skip(msg="Too few cores available for a collective operation.") - - if FEATURES[GPU] in self.current_partition.features: - max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self) - # Setting number of GPU for a cpu test on a GPU node. - if SCALES.get(self.scale).get('num_nodes') == 1: - self.num_gpus_per_node = 1 - else: - self.num_gpus_per_node = max_avail_gpus_per_node - elif self.device_buffers == 'cuda': - max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self) - # Setting num_tasks and num_tasks_per_node for the GPU tests - if max_avail_gpus_per_node == 1 and SCALES.get(self.scale).get('num_nodes') == 1: - self.skip(msg="There is only 1 device in the node. Skipping collective tests involving only 1 node.") - else: - if SCALES.get(self.scale).get('num_gpus_per_node', 0) * SCALES.get(self.scale).get('num_nodes', 0) > 1: - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU])) - elif SCALES.get(self.scale).get('node_part', 0): - pass_num_per = int(max_avail_gpus_per_node / SCALES.get(self.scale).get('node_part', 0)) - if pass_num_per > 1: - hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU])) - else: - self.skip(msg="Total GPUs (max_avail_gpus_per_node / node_part) is 1 less.") - else: - self.skip(msg="Total GPUs (num_nodes * num_gpus_per_node) = 1") + def skip_test_1gpu(self): + if self.device_type == DEVICE_TYPES[GPU]: + num_gpus = self.num_gpus_per_node * self.num_nodes + self.skip_if(num_gpus < 2, "Skipping GPU test : only 1 GPU available for this test case")