Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use mixin class for osu #222

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions eessi/testsuite/eessi_mixin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from reframe.core.builtins import parameter, run_after, variable
from reframe.core.builtins import parameter, run_after, run_before, variable
from reframe.core.exceptions import ReframeFatalError
from reframe.core.pipeline import RegressionMixin
from reframe.utility.sanity import make_performance_function
Expand Down Expand Up @@ -44,6 +44,7 @@ class EESSI_Mixin(RegressionMixin):
scale = parameter(SCALES.keys())
bench_name = None
bench_name_ci = None
num_tasks_per_compute_unit = 1

# Create ReFrame variables for logging runtime environment information
cvmfs_repo_name = variable(str, value='None')
Expand Down Expand Up @@ -119,7 +120,7 @@ def run_after_init(self):
# Set scales as tags
hooks.set_tag_scale(self)

@run_after('init')
@run_before('setup', always_last=True)
def measure_mem_usage(self):
if self.measure_memory_usage:
hooks.measure_memory_usage(self)
Expand Down Expand Up @@ -164,7 +165,8 @@ def validate_setup(self):
@run_after('setup')
def assign_tasks_per_compute_unit(self):
"""Call hooks to assign tasks per compute unit, set OMP_NUM_THREADS, and set compact process binding"""
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit)
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit,
num_per=self.num_tasks_per_compute_unit)

# Set OMP_NUM_THREADS environment variable
hooks.set_omp_num_threads(self)
Expand Down
263 changes: 95 additions & 168 deletions eessi/testsuite/tests/apps/osu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
non-GPU nodes. Otherwise those tests will FAIL.
"""
import reframe as rfm
from reframe.core.builtins import parameter, run_after # added only to make the linter happy
from reframe.core.builtins import parameter, run_after
from reframe.utility import reframe

from hpctestlib.microbenchmarks.mpi.osu import osu_benchmark

from eessi.testsuite import hooks, utils
from eessi.testsuite.constants import *
from eessi.testsuite.constants import COMPUTE_UNIT, CPU, DEVICE_TYPES, INVALID_SYSTEM, GPU, NODE, SCALES
from eessi.testsuite.eessi_mixin import EESSI_Mixin
from eessi.testsuite.utils import find_modules, log


def filter_scales_pt2pt():
def filter_scales_pt2pt_cpu():
"""
Filtering function for filtering scales for the pt2pt OSU test
Filtering function for filtering scales for the pt2pt OSU test on CPUs
returns all scales with either 2 cores, 1 full node, or 2 full nodes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit surprised that it runs on 2 cores, full node and 2 full nodes, but not e.g. on 2 GPUs (which on Snellius happens to be 1_2_node). This was already the case before your changes here, so it's not introduced in this PR, but maybe @satishskamath can comment on why this is the case? I do seem to remember him saying he thought exclusive nodes would be good for reproducibility for these tests, but according to that argument also the 2-cores size would have been filtered - I'd say having 2 GPUs is the equivalent .

Now, I do think it is difficult to filter the GPUs down to 2-GPU setups, as you don't know this until after the setup phase. I.e. we'd basically have to accept everything single-node in the init phase, and then skip the ones that would provide more than 2 GPUs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix this in a follow-up PR, see #224

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: i decided to add it to this PR anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial attempt on this and the reason I kept 2_cores and 1cpn_2nodes case for GPUs for point to point tests was because these cases were valid and one needs at least one host CPU to assign to a device. But these scales are mainly for CPUs therefore can be a bit misleading and I do lean towards the fact that network based tests are performed the best using two full exclusive nodes or 1 full node which is the objective of this test as well.

"""
return [
k for (k, v) in SCALES.items()
Expand All @@ -28,9 +29,23 @@ def filter_scales_pt2pt():
]


def filter_scales_pt2pt_gpu():
"""
Filtering function for filtering scales for the pt2pt OSU test on GPUs
returns all scales with either a partial node, 1 full node, or 2 full nodes
"""
return [
k for (k, v) in SCALES.items()
if (v['num_nodes'] == 1 and v.get('node_part', 0) > 1)
or (v['num_nodes'] == 2 and v.get('node_part', 0) == 1)
or (v['num_nodes'] == 1 and v.get('node_part', 0) == 1)
]


def filter_scales_coll():
"""
Filtering function for filtering scales for collective the OSU test
Filtering function for filtering scales for the collective OSU test
returns all scales with at least 2 cores
"""
return [
k for (k, v) in SCALES.items()
Expand All @@ -39,25 +54,22 @@ def filter_scales_coll():
]


@rfm.simple_test
class EESSI_OSU_Micro_Benchmarks_pt2pt(osu_benchmark):
''' Run-only OSU test '''
scale = parameter(filter_scales_pt2pt())
valid_prog_environs = ['default']
valid_systems = ['*']
class EESSI_OSU_Base(osu_benchmark):
""" base class for OSU tests """
time_limit = '30m'
module_name = parameter(find_modules('OSU-Micro-Benchmarks'))
# Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default
# device type is set to GPU.
device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]])
# unset num_tasks_per_node from the hpctestlib.

# reset num_tasks_per_node from the hpctestlib: we handle it ourselves
num_tasks_per_node = None

# Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects
num_warmup_iters = 5
# Set num_iters to 10 to reduce execution time, especially on slower interconnects
num_iters = 10

def required_mem_per_node(self):
return self.num_tasks_per_node * 1024

@run_after('init')
def filter_scales_2gpus(self):
"""Filter out scales with < 2 GPUs if running on GPUs"""
Expand All @@ -69,26 +81,6 @@ def filter_scales_2gpus(self):
self.valid_systems = [INVALID_SYSTEM]
log(f'valid_systems set to {self.valid_systems} for scale {self.scale} and device_type {self.device_type}')

@run_after('init')
def filter_benchmark_pt2pt(self):
""" Filter out all non-mpi.pt2pt benchmarks """
if not self.benchmark_info[0].startswith('mpi.pt2pt'):
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def run_after_init(self):
"""hooks to run after init phase"""

# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)

hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type)

hooks.set_modules(self)

# Set scales as tags
hooks.set_tag_scale(self)

@run_after('init')
def set_device_buffers(self):
"""
Expand All @@ -98,32 +90,38 @@ def set_device_buffers(self):
"""
if self.device_type == DEVICE_TYPES[GPU]:
self.device_buffers = 'cuda'

else:
# If the device_type is CPU then device_buffers should always be CPU.
self.device_buffers = 'cpu'

@run_after('init')
def set_tag_ci(self):
""" Setting tests under CI tag. """
if (self.benchmark_info[0] in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']):
self.tags.add('CI')
log(f'tags set to {self.tags}')
def set_tags(self):
""" Setting custom tags """
self.bench_name = self.benchmark_info[0]
self.tags.add(self.bench_name.split('.')[-1])


if (self.benchmark_info[0] == 'mpi.pt2pt.osu_bw'):
self.tags.add('osu_bw')
class EESSI_OSU_pt2pt_Base(EESSI_OSU_Base):
''' point-to-point OSU test base class '''
compute_unit = COMPUTE_UNIT[NODE]

if (self.benchmark_info[0] == 'mpi.pt2pt.osu_latency'):
self.tags.add('osu_latency')
@run_after('init')
def filter_benchmark_pt2pt(self):
""" Filter out all non-mpi.pt2pt benchmarks """
if not self.benchmark_info[0].startswith('mpi.pt2pt'):
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def set_mem(self):
""" Setting an extra job option of memory. This test has only 4 possibilities: 1_node, 2_nodes, 2_cores and
1cpn_2nodes. This is implemented for all cases including full node cases. The requested memory may seem large
and the test requires at least 4.5 GB per core at the minimum for the full test when run with validation (-c
option for osu_bw or osu_latency). We run till message size 8 (-m 8) which significantly reduces memory
requirement."""
self.extra_resources = {'memory': {'size': '12GB'}}
def select_ci(self):
" Select the CI variants "
if (self.bench_name in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']):
self.bench_name_ci = self.bench_name

@run_after('init')
def set_num_tasks_per_compute_unit(self):
""" Setting number of tasks per compute unit and cpus per task. This sets num_cpus_per_task
for 1 node and 2 node options where the request is for full nodes."""
if SCALES.get(self.scale).get('num_nodes') == 1:
self.num_tasks_per_compute_unit = 2

@run_after('setup')
def adjust_executable_opts(self):
Expand All @@ -132,134 +130,63 @@ def adjust_executable_opts(self):
Therefore we must override it *after* the 'setup' phase
"""
if self.device_type == DEVICE_TYPES[CPU]:
self.executable_opts = [ele for ele in self.executable_opts if ele != 'D']
self.executable_opts = [x for x in self.executable_opts if x != 'D']

@run_after('setup')
def set_num_tasks_per_node(self):
""" Setting number of tasks per node and cpus per task in this function. This function sets num_cpus_per_task
for 1 node and 2 node options where the request is for full nodes."""
if SCALES.get(self.scale).get('num_nodes') == 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], 2)
else:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE])

@rfm.simple_test
class EESSI_OSU_pt2pt_CPU(EESSI_OSU_pt2pt_Base, EESSI_Mixin):
''' point-to-point OSU test on CPUs'''
scale = parameter(filter_scales_pt2pt_cpu())
device_type = DEVICE_TYPES[CPU]


@rfm.simple_test
class EESSI_OSU_pt2pt_GPU(EESSI_OSU_pt2pt_Base, EESSI_Mixin):
''' point-to-point OSU test on GPUs'''
scale = parameter(filter_scales_pt2pt_gpu())
device_type = DEVICE_TYPES[GPU]

@run_after('setup')
def set_num_gpus_per_node(self):
"""
Set number of GPUs per node for GPU-to-GPU tests
"""
if self.device_type == DEVICE_TYPES[GPU]:
# Skip single-node tests with less than 2 GPU devices in the node
self.skip_if(
SCALES[self.scale]['num_nodes'] == 1 and self.default_num_gpus_per_node < 2,
"There are < 2 GPU devices present in the node."
f" Skipping tests with device_type={DEVICE_TYPES[GPU]} involving < 2 GPUs and 1 node."
)
if not self.num_gpus_per_node:
self.num_gpus_per_node = self.default_num_gpus_per_node
log(f'num_gpus_per_node set to {self.num_gpus_per_node} for partition {self.current_partition.name}')
def skip_test_1gpu(self):
num_gpus = self.num_gpus_per_node * self.num_nodes
self.skip_if(
num_gpus != 2 and self.scale not in ['1_node', '2_nodes'],
f"Skipping test : {num_gpus} GPU(s) available for this test case, need exactly 2"
)


@rfm.simple_test
class EESSI_OSU_Micro_Benchmarks_coll(osu_benchmark):
''' Run-only OSU test '''
class EESSI_OSU_coll(EESSI_OSU_Base, EESSI_Mixin):
''' collective OSU test '''
scale = parameter(filter_scales_coll())
valid_prog_environs = ['default']
valid_systems = ['*']
time_limit = '30m'
module_name = parameter(utils.find_modules('OSU-Micro-Benchmarks'))
# Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default
# device type is set to GPU.
device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]])
# Unset num_tasks_per_node from hpctestlib
num_tasks_per_node = None

# Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects
num_warmup_iters = 5
# Set num_iters to 10 to reduce execution time, especially on slower interconnects
num_iters = 10

@run_after('init')
def run_after_init(self):
"""hooks to run after init phase"""
# Note: device_buffers variable is inherited from the hpctestlib class and adds options to the launcher
# commands based on what device is set.
self.device_buffers = 'cpu'
# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)
hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type)
is_cuda_module = utils.is_cuda_required_module(self.module_name)
if is_cuda_module and self.device_type == DEVICE_TYPES[GPU]:
self.device_buffers = 'cuda'

# If the device_type is CPU then device buffer should always be CPU.
if self.device_type == DEVICE_TYPES[CPU]:
self.device_buffers = 'cpu'
# This part of the code removes the collective communication calls out of the run list since this test is only
# meant for collective.
def filter_benchmark_coll(self):
""" Filter out all non-mpi.collective benchmarks """
if not self.benchmark_info[0].startswith('mpi.collective'):
self.valid_systems = []
hooks.set_modules(self)

@run_after('init')
def set_tag_ci(self):
if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce'
or self.benchmark_info[0] == 'mpi.collective.osu_alltoall'):
self.tags.add('CI')
if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce'):
self.tags.add('osu_allreduce')
if (self.benchmark_info[0] == 'mpi.collective.osu_alltoall'):
self.tags.add('osu_alltoall')
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def set_mem(self):
""" Setting an extra job option of memory. The alltoall operation takes maximum memory of 0.1 GB per core for a
message size of 8 and almost 0.5 GB per core for the maximum message size the test allows. But we limit the
message sizes to 8 and for a safety net we take 64 GB assuming dense nodes works for all the tests and node
types."""
self.extra_resources = {'memory': {'size': '64GB'}}
def select_ci(self):
" Select the CI variants "
if (self.bench_name in ['mpi.collective.osu_allreduce', 'mpi.collective.osu_alltoall']):
self.bench_name_ci = self.bench_name

@run_after('init')
def set_num_tasks(self):
hooks.set_tag_scale(self)
def set_compute_unit(self):
"""
Set the compute unit to which tasks will be assigned:
one task per core for CPU runs, and one task per GPU for GPU runs.
"""
device_to_compute_unit = {
DEVICE_TYPES[CPU]: COMPUTE_UNIT[CPU],
DEVICE_TYPES[GPU]: COMPUTE_UNIT[GPU],
}
self.compute_unit = device_to_compute_unit.get(self.device_type)

@run_after('setup')
def set_num_tasks_per_node(self):
""" Setting number of tasks per node, cpus per task and gpus per node in this function. This function sets
num_cpus_per_task for 1 node and 2 node options where the request is for full nodes."""
max_avail_cpus_per_node = self.current_partition.processor.num_cpus
if self.device_buffers == 'cpu':
# Setting num_tasks and num_tasks_per_node for the CPU tests
if SCALES.get(self.scale).get('num_cpus_per_node', 0):
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE],
self.default_num_cpus_per_node)
elif SCALES.get(self.scale).get('node_part', 0):
pass_num_per = int(max_avail_cpus_per_node / SCALES.get(self.scale).get('node_part', 0))
if pass_num_per > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], pass_num_per)
else:
self.skip(msg="Too few cores available for a collective operation.")

if FEATURES[GPU] in self.current_partition.features:
max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self)
# Setting number of GPU for a cpu test on a GPU node.
if SCALES.get(self.scale).get('num_nodes') == 1:
self.num_gpus_per_node = 1
else:
self.num_gpus_per_node = max_avail_gpus_per_node
elif self.device_buffers == 'cuda':
max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self)
# Setting num_tasks and num_tasks_per_node for the GPU tests
if max_avail_gpus_per_node == 1 and SCALES.get(self.scale).get('num_nodes') == 1:
self.skip(msg="There is only 1 device in the node. Skipping collective tests involving only 1 node.")
else:
if SCALES.get(self.scale).get('num_gpus_per_node', 0) * SCALES.get(self.scale).get('num_nodes', 0) > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU]))
elif SCALES.get(self.scale).get('node_part', 0):
pass_num_per = int(max_avail_gpus_per_node / SCALES.get(self.scale).get('node_part', 0))
if pass_num_per > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU]))
else:
self.skip(msg="Total GPUs (max_avail_gpus_per_node / node_part) is 1 less.")
else:
self.skip(msg="Total GPUs (num_nodes * num_gpus_per_node) = 1")
def skip_test_1gpu(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we agree that if it's a one node test, we would check if we had exactly 2 GPUs and skip otherwise? I mean, the downside is that for nodes with very weird GPU counts (3, or 5), none of the standard scales would have this. The downside of the current situations is that I now essentially run the same test twice on a 4-GPU node: once at the 1_node scale, and once at the 1_2_node scale.

On the other hand, we also discussed the value of exclusively allocating, so that means we would also not skip 1_node in any case. So I guess the only nodes where this would make a difference is if you have 8 gpus per node. In that case, you might want to run 1_node and 1_4_node, but skip 1_2_node.

Well, let me know what you think. It might also be too messy to implement, in which case I'm happy to keep the current setup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this comment is on the collective test, but you probably meant to add it to the pt2pt test ;))

yes, i had the same thoughts, and i agree neither choice is ideal.

i am now thinking that maybe we should not include the 1_node and instead leave it up to the user to request an exclusive node (with sbatch --exclusive) if they want a full node.

there is also the question of what we should do with the 2_nodes scale. a solution could be to introduce extra scales 1_gpu, 2_gpus, and 1gpn_2nodes (that we may want to keep out of the default SCALES list?), which automatically sets the number of cores so that the default number of cores per gpu is respected based on the info in the site config file and autodetection. this solution would also avoid having to add the skip_if statements because we can then select only the scales with 2 gpus.

what do you think of this solution?

Copy link
Collaborator

@casparvl casparvl Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what the impact will be on existing tests.

I.e. suppose you have a GPU test that just runs at any scale (say: GROMACS). Having both the scales 1_4_node and 1_gpu will generate two essentially identical test instances on a partition with 4 GPUs per node. This means that as an end-user, you pretty much have to select a set of tags. Whereas right now, it is completely reasonable to run the test suite without any tags, if you just want to run anything. I would like to keep that behavior tbh: if you just want to run everything (once), just run reframe without tags. I mean, that's essentially why I was complaining about the fact that this would run 2 essentially identical test instances in the first case.

Similarly, suppose you have CPU tests. You then have to filter all of those gpu-related scales. That makes a straightforward case (a CPU test that runs at any scale) suddenly more complex, since every CPU test would need to implement some filtering (though we could probably define a filtered list in constants.py as well).

I don't know, maybe we should just leave it as you implemented it now, I feel the cure is getting worse than the disease :). Sure, the 1_2_node and 1_node scales are nearly identical on a 4-GPU node (with the exception that the second is, indeed, exclusive, which has some value at least). So I guess only on an 8-GPU node would you really get multiple tests that provide zero extra information (1_4_node and 1_2_node would be 'identical' there: only using 2 GPUs, but no exclusive node in either case).

if self.device_type == DEVICE_TYPES[GPU]:
num_gpus = self.num_gpus_per_node * self.num_nodes
self.skip_if(num_gpus < 2, "Skipping GPU test : only 1 GPU available for this test case")
Loading