diff --git a/eessi/testsuite/constants.py b/eessi/testsuite/constants.py index 023ca0b5..3f3aacad 100644 --- a/eessi/testsuite/constants.py +++ b/eessi/testsuite/constants.py @@ -1,9 +1,11 @@ """ Constants for ReFrame tests """ + AMD = 'AMD' CI = 'CI' CPU = 'CPU' +CPU_SOCKET = 'CPU_SOCKET' GPU = 'GPU' GPU_VENDOR = 'GPU_VENDOR' INTEL = 'INTEL' @@ -14,6 +16,12 @@ GPU: 'gpu', } +COMPUTE_UNIT = { + CPU: 'cpu', + CPU_SOCKET: 'cpu_socket', + GPU: 'gpu', +} + TAGS = { CI: 'CI', } diff --git a/eessi/testsuite/hooks.py b/eessi/testsuite/hooks.py index e9b40b28..453d6305 100644 --- a/eessi/testsuite/hooks.py +++ b/eessi/testsuite/hooks.py @@ -7,30 +7,36 @@ import reframe as rfm from eessi.testsuite.constants import * -from eessi.testsuite.utils import get_max_avail_gpus_per_node, is_cuda_required_module, log - -PROCESSOR_INFO_MISSING = ''' -This test requires the number of CPUs to be known for the partition it runs on. -Check that processor information is either autodetected -(see https://reframe-hpc.readthedocs.io/en/stable/configure.html#proc-autodetection), -or manually set in the ReFrame configuration file -(see https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#processor-info). -''' - +from eessi.testsuite.utils import get_max_avail_gpus_per_node, is_cuda_required_module, log, check_proc_attribute_defined def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str): """ - Assign one task per compute unit (DEVICE_TYPES[CPU] or DEVICE_TYPES[GPU]). + Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]). Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node, based on the current scale and the current partition’s num_cpus, max_avail_gpus_per_node and num_nodes. For GPU tests, one task per GPU is set, and num_cpus_per_task is based on the ratio of CPU-cores/GPUs. For CPU tests, one task per CPU is set, and num_cpus_per_task is set to 1. Total task count is determined based on the number of nodes to be used in the test. Behaviour of this function is (usually) sensible for MPI tests. + + Arguments: + - test: the ReFrame test to which this hook should apply + - compute_unit: a device as listed in eessi.testsuite.constants.COMPUTE_UNIT + + Examples: + On a single node with 2 sockets, 64 cores and 128 hyperthreads: + - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread + - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task + + Future work: + Currently, on a single node with 2 sockets, 64 cores and 128 hyperthreads, this + - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU], true) will launch 128 tasks with 1 thread + - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET], true) will launch 2 tasks with 64 threads per task + In the future, we'd like to add an arugment that disables spawning tasks for hyperthreads. """ + check_proc_attribute_defined(test, 'num_cpus') test.max_avail_cpus_per_node = test.current_partition.processor.num_cpus - if test.max_avail_cpus_per_node is None: - raise AttributeError(PROCESSOR_INFO_MISSING) + log(f'max_avail_cpus_per_node set to {test.max_avail_cpus_per_node}') # Check if either node_part, or default_num_cpus_per_node and default_num_gpus_per_node are set correctly if not ( @@ -59,13 +65,68 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str log(f'default_num_cpus_per_node set to {test.default_num_cpus_per_node}') - if compute_unit == DEVICE_TYPES[GPU]: + if compute_unit == COMPUTE_UNIT[GPU]: _assign_one_task_per_gpu(test) - elif compute_unit == DEVICE_TYPES[CPU]: + elif compute_unit == COMPUTE_UNIT[CPU]: _assign_one_task_per_cpu(test) + elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]: + _assign_one_task_per_cpu_socket(test) else: raise ValueError(f'compute unit {compute_unit} is currently not supported') +def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest): + """ + Determines the number of tasks per node by dividing the default_num_cpus_per_node by + the number of cpus available per socket, and rounding up. The result is that for full-node jobs the default + will spawn one task per socket, with a number of cpus per task equal to the number of cpus per socket. + Other examples: + - half a node (i.e. node_part=2) on a 4-socket system would result in 2 tasks per node, + with number of cpus per task equal to the number of cpus per socket. + - 2 cores (i.e. default_num_cpus_per_node=2) on a 16 core system with 2 sockets would result in + 1 task per node, with 2 cpus per task + + This default is set unless the test is run with: + --setvar num_tasks_per_node= and/or + --setvar num_cpus_per_task=. + In those cases, those take precedence, and the remaining variable (num_cpus_per task or + num_tasks_per_node respectively) is calculated based on the equality + test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node. + + Variables: + - default_num_cpus_per_node: default number of CPUs per node as defined in the test + (e.g. by earlier hooks like set_tag_scale) + + + Default resources requested: + - num_tasks_per_node = default_num_cpus_per_node + - num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node + """ + # neither num_tasks_per_node nor num_cpus_per_task are set + if not test.num_tasks_per_node and not test.num_cpus_per_task: + check_proc_attribute_defined(test, 'num_cpus') + check_proc_attribute_defined(test, 'num_sockets') + num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets + test.num_tasks_per_node = math.ceil(test.default_num_cpus_per_node / num_cpus_per_socket) + test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) + + # num_tasks_per_node is not set, but num_cpus_per_task is + elif not test.num_tasks_per_node: + check_proc_attribute_defined(test, 'num_cpus') + check_proc_attribute_defined(test, 'num_sockets') + num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets + test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task) + + # num_cpus_per_task is not set, but num_tasks_per_node is + elif not test.num_cpus_per_task: + test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) + + else: + pass # both num_tasks_per_node and num_cpus_per_node are already set + + test.num_tasks = test.num_nodes * test.num_tasks_per_node + log(f'Number of tasks per node set to: {test.num_tasks_per_node}') + log(f'Number of cpus per task set to {test.num_cpus_per_task}') + log(f'num_tasks set to {test.num_tasks}') def _assign_one_task_per_cpu(test: rfm.RegressionTest): """ @@ -239,5 +300,60 @@ def check_custom_executable_opts(test: rfm.RegressionTest, num_default: int = 0) test.has_custom_executable_opts = False if len(test.executable_opts) > num_default: test.has_custom_executable_opts = True - log(f'has_custom_executable_opts set to {test.has_custom_executable_opts}') + + +def set_compact_process_binding(test: rfm.RegressionTest): + """ + This hook sets a binding policy for process binding. + More specifically, it will bind each process to subsequent domains of test.num_cpus_per_task cores. + + A few examples: + - Pure MPI (test.num_cpus_per_task = 1) will result in binding 1 process to each core. + this will happen in a compact way, i.e. rank 0 to core 0, rank 1 to core 1, etc + - Hybrid MPI-OpenMP, e.g. test.num_cpus_per_task = 4 will result in binding 1 process to subsequent sets of 4 cores. + I.e. rank 0 to core 0-3, rank 1 to core 4-7, rank 2 to core 8-11, etc + + It is hard to do this in a portable way. Currently supported for process binding are: + - Intel MPI (through I_MPI_PIN_DOMAIN) + - OpenMPI (through OMPI_MCA_rmaps_base_mapping_policy) + - srun (LIMITED SUPPORT: through SLURM_CPU_BIND, but only effective if task/affinity plugin is enabled) + """ + + # Check if hyperthreading is enabled. If so, divide the number of cpus per task by the number + # of hw threads per core to get a physical core count + check_proc_attribute_defined(test, 'num_cpus_per_core') + num_cpus_per_core = test.current_partition.processor.num_cpus_per_core + physical_cpus_per_task = int(test.num_cpus_per_task / num_cpus_per_core) + + # Do binding for intel and OpenMPI's mpirun, and srun + # Other launchers may or may not do the correct binding + test.env_vars['I_MPI_PIN_CELL'] = 'core' # Don't bind to hyperthreads, only to physcial cores + test.env_vars['I_MPI_PIN_DOMAIN'] = '%s:compact' % physical_cpus_per_task + test.env_vars['OMPI_MCA_rmaps_base_mapping_policy'] = 'node:PE=%s' % physical_cpus_per_task + # Default binding for SLURM. Only effective if the task/affinity plugin is enabled + # and when number of tasks times cpus per task equals either socket, core or thread count + test.env_vars['SLURM_CPU_BIND'] = 'verbose' + log(f'Set environment variable I_MPI_PIN_DOMAIN to {test.env_vars["I_MPI_PIN_DOMAIN"]}') + log(f'Set environment variable OMPI_MCA_rmaps_base_mapping_policy to {test.env_vars["OMPI_MCA_rmaps_base_mapping_policy"]}') + log(f'Set environment variable SLURM_CPU_BIND to {test.env_vars["SLURM_CPU_BIND"]}') + + +def set_compact_thread_binding(test: rfm.RegressionTest): + """ + This hook sets a binding policy for thread binding. + It sets a number of environment variables to try and set a sensible binding for OPENMP tasks. + + Thread binding is supported for: + - GNU OpenMP (through OMP_NUM_THREADS, OMP_PLACES and OMP_PROC_BIND) + - Intel OpenMP (through KMP_AFFINITY) + """ + + # Set thread binding + test.env_vars['OMP_PLACES'] = 'cores' + test.env_vars['OMP_PROC_BIND'] = 'close' + # See https://www.intel.com/content/www/us/en/docs/cpp-compiler/developer-guide-reference/2021-8/thread-affinity-interface.html + test.env_vars['KMP_AFFINITY'] = 'granularity=fine,compact,1,0' + log(f'Set environment variable OMP_PLACES to {test.env_vars["OMP_PLACES"]}') + log(f'Set environment variable OMP_PROC_BIND to {test.env_vars["OMP_PROC_BIND"]}') + log(f'Set environment variable KMP_AFFINITY to {test.env_vars["KMP_AFFINITY"]}') diff --git a/eessi/testsuite/tests/apps/tensorflow/src/mnist_setup.py b/eessi/testsuite/tests/apps/tensorflow/src/mnist_setup.py new file mode 100644 index 00000000..8d47cf0e --- /dev/null +++ b/eessi/testsuite/tests/apps/tensorflow/src/mnist_setup.py @@ -0,0 +1,31 @@ +import tensorflow as tf +import numpy as np + +def mnist_dataset(batch_size, test_batch_size): + (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() + # The `x` arrays are in uint8 and have values in the [0, 255] range. + # You need to convert them to float32 with values in the [0, 1] range. + x_train = x_train / np.float32(255) + y_train = y_train.astype(np.int64) + x_test = x_test / np.float32(255) + y_test = y_test.astype(np.int64) + train_dataset = tf.data.Dataset.from_tensor_slices( + (x_train, y_train)).shuffle(60000).repeat().batch(batch_size) + test_dataset = tf.data.Dataset.from_tensor_slices( + (x_test, y_test)).batch(test_batch_size) + return train_dataset, test_dataset + +def build_and_compile_cnn_model(): + model = tf.keras.Sequential([ + tf.keras.layers.InputLayer(input_shape=(28, 28)), + tf.keras.layers.Reshape(target_shape=(28, 28, 1)), + tf.keras.layers.Conv2D(32, 3, activation='relu'), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(128, activation='relu'), + tf.keras.layers.Dense(10) + ]) + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), + optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), + metrics=['accuracy']) + return model diff --git a/eessi/testsuite/tests/apps/tensorflow/src/tf_test.py b/eessi/testsuite/tests/apps/tensorflow/src/tf_test.py new file mode 100644 index 00000000..ac41415b --- /dev/null +++ b/eessi/testsuite/tests/apps/tensorflow/src/tf_test.py @@ -0,0 +1,192 @@ +# Author: Caspar van Leeuwen (SURF) +# Based on the example for the MultiWorkerMirroredStrategy with keras from +# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras +import argparse +import json +import logging +import os +import sys +import socket + +from contextlib import closing +from mpi4py import MPI +from timeit import default_timer as timer + +import tensorflow as tf + +def print0(msg, comm=MPI.COMM_WORLD): + '''Prints string "msg" from rank 0''' + output = comm.gather(msg, root=0) + rank = comm.Get_rank() + # Print elements per rank + if rank == 0: + for (rank, rank_out) in enumerate(output): + print(f'Rank {rank}: {rank_out}') + +def find_free_port(): + '''Function that gets a free port for the current process''' + with closing(socket.socket()) as s: + s.bind(('', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) + return s.getsockname()[1] + +def get_local_rank(rank_info, rank_info_list): + '''Function that figures out the local rank based on a list of rank, + hostname, and port gathered from each of the workers''' + # Note that rank_info_list is sorted by rank, by definition of the MPI allgather routine. + # Thus, if our current rank is the n-th item in rank_info_list for which the hostname matches, + # our local rank is n + for index, item in enumerate(rank_info_list): + if item['hostname'] == rank_info['hostname'] and item['rank'] == rank_info['rank']: + return index + +def get_rank_info(comm=MPI.COMM_WORLD): + '''Create a dict for this worker containing rank, hostname and port to be used by this worker''' + rank = comm.Get_rank() + hostname = socket.gethostname() + port = find_free_port() + + return { + 'rank': rank, + 'hostname': hostname, + 'port': port, + } + +def set_tf_config(rank_info, rank_info_list): + '''Sets the TF_CONFIG environment variable for the current worker, based on the rank_info_list''' + worker_list = ['%s:%s' % (item['hostname'], item['port']) for item in rank_info_list] + + tf_config = { + 'cluster': { + 'worker': worker_list, + }, + 'task': {'type': 'worker', 'index': rank_info['rank']} + } + os.environ["TF_CONFIG"] = json.dumps(tf_config) + + # logger.info(f"Set TF_CONFIG for rank {rank_info['rank']} to {tf_config}.") + print0(f"Set TF_CONFIG for rank {rank_info['rank']} to {tf_config}.") + return tf_config + +parser = argparse.ArgumentParser( + prog='Tensorflow Distributed Test', + description='This program runs a distributed TensorFlow test using the tf.distribute.MultiWorkerMirroredStrategy and the Keras fit API' +) +parser.add_argument('-d', '--device', type=str, default='cpu', choices=['cpu', 'gpu'], help='Device to use for training') +parser.add_argument('--inter-op-parallelism', type=int, default=1, help='Sets tf.config.threading.set_inter_op_parallelism_threads') +parser.add_argument('--intra-op-parallelism', type=int, default=0, help='Sets tf.config.threading.set_intra_op_parallelism_threads') +parser.add_argument('--per-worker-batch-size', type=int, default=4096, help='Batch size processed by each worker') +parser.add_argument('--per-worker-test-batch-size', type=int, default=512, help='Batch size for computing accuracy on the validation set') +parser.add_argument('--epochs-to-train', type=int, default=4, help='Number of epochs to train') +parser.add_argument('--steps-per-epoch', type=int, default=25, help='Number of steps to train per epoch') +args = parser.parse_args() + +# Make sure we can import mnist_setup from current dir +if '.' not in sys.path: + sys.path.insert(0, '.') +import mnist_setup + +os.environ.pop('TF_CONFIG', None) + +# Set number of threads to use. Needs to be set early, before initialization +tf.config.threading.set_inter_op_parallelism_threads(args.inter_op_parallelism) +tf.config.threading.set_intra_op_parallelism_threads(args.intra_op_parallelism) + +# Multi-worker config +# We'll use mpi4py to figure out our rank, have each process select a socket and hostname, +# and allreduce that information to create a TF_CONFIG +comm = MPI.COMM_WORLD +rank_info = get_rank_info(comm) +rank_info_list = comm.allgather(rank_info) +local_rank = get_local_rank(rank_info, rank_info_list) + +# Create logger per rank +#logging.basicConfig( +# filename='rank_%s.out' % local_rank, +# format='%(asctime)s %(levelname)s %(message)s', +# datefmt='%Y-%m-%d %H:%M:%S' +#) +#logging.info(f"Rank {rank_info['rank']} has local_rank {local_rank}, hostname {rank_info['hostname']} and port {rank_info['port']}") +print0(f"Rank {rank_info['rank']} has local_rank {local_rank}, hostname {rank_info['hostname']} and port {rank_info['port']}") + +# Turn off tensorflow info and warnings for rank != 0 +if local_rank != 0: + print("Turning off logging") + #tf.get_logger().setLevel('ERROR') + #tf.autograph.set_verbosity(1) + #tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) + os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + +tf_config = set_tf_config(rank_info, rank_info_list) +num_workers = len(tf_config['cluster']['worker']) + +# Set visible devices and create MultiWorkerMirroredStrategy +#logging.info(f"Selecting device: {args.device}") +print0(f"Selecting device: {args.device}") +if args.device == 'gpu': + # Limit each local rank to its own GPU. + physical_devices = tf.config.list_physical_devices('GPU') + try: + # We could do local_rank % len(physical_devices) if we wanted to support running more than one rank per device + # The current code doesn't support that + tf.config.set_visible_devices(physical_devices[local_rank], 'GPU') + visible_devices = tf.config.get_visible_devices('GPU') + # logging.info("Local rank: %s, visible_devices: %s" % (local_rank, visible_devices)) + print0("Local rank: %s, visible_devices: %s" % (local_rank, visible_devices)) + assert len(visible_devices) == 1 + except: + print0("ERROR: Selection of GPU device based on local rank failed. Local rank: %s. Selected devices: %s" + % (local_rank, visible_devices)) + # logging.error("Selection of GPU device based on local rank failed. Local rank: %s. Selected devices: %s" + # % (local_rank, visible_devices)) + + # Should now have 1 GPU per process. Set memory growth for that device to avoid issues similar to + # https://github.com/tensorflow/tensorflow/issues/45044 + # TODO: I have the feeling this is only needed because rank 0 somehow erroneously starts on CPU? + # The fact that rank 0 started on CPU was triggered by setting OMP_PROC_BIND. I need to test if the following + # line is still needed if we don't set OMP* binding variables + tf.config.experimental.set_memory_growth(physical_devices[local_rank], True) + + # Set communication to NCCL explicitely + communication_options = tf.distribute.experimental.CommunicationOptions( + implementation=tf.distribute.experimental.CommunicationImplementation.NCCL) + strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=communication_options) +elif args.device == 'cpu': + # Run on CPU, so make sure no GPU devices are visible + tf.config.set_visible_devices([], 'GPU') + visible_devices = tf.config.get_visible_devices() + print0("Local rank: %s, visible_devices: %s" % (local_rank, visible_devices)) + # logging.info("Local rank: %s, visible_devices: %s" % (local_rank, visible_devices)) + strategy = tf.distribute.MultiWorkerMirroredStrategy() +# logging.info("Multiworker strategy created") +print0("Multiworker strategy created") + +# Get datasets +global_batch_size = args.per_worker_batch_size * num_workers +global_test_batch_size = args.per_worker_test_batch_size * num_workers +multi_worker_dataset, multi_worker_test_dataset = mnist_setup.mnist_dataset(global_batch_size, global_test_batch_size) + +with strategy.scope(): + multi_worker_model = mnist_setup.build_and_compile_cnn_model() + +# Run the training +starttime = timer() +multi_worker_model.fit(multi_worker_dataset, epochs=args.epochs_to_train, steps_per_epoch=args.steps_per_epoch, verbose=2) +endtime = timer() +#logging.info("Keras fit completed!") +print0("Keras fit completed!") + +# Compute performance +training_time = endtime-starttime +total_samples_trained = global_batch_size*args.steps_per_epoch*args.epochs_to_train +throughput = total_samples_trained / training_time +if local_rank == 0: + print(f"Total training time: {training_time}") + print(f"Performance: {throughput} img/s") + +# Run evaluation to check accuracy on validation set +logging.info("Run evaluation") +metrics = multi_worker_model.evaluate(multi_worker_test_dataset, verbose=2) +if local_rank == 0: + print(f"Final loss: {metrics[multi_worker_model.metrics_names.index('loss')]}") + print(f"Final accuracy: {metrics[multi_worker_model.metrics_names.index('accuracy')]}") diff --git a/eessi/testsuite/tests/apps/tensorflow/tensorflow.py b/eessi/testsuite/tests/apps/tensorflow/tensorflow.py new file mode 100644 index 00000000..d6a8a76a --- /dev/null +++ b/eessi/testsuite/tests/apps/tensorflow/tensorflow.py @@ -0,0 +1,119 @@ +""" +This module tests TensorFlow in available modules containing substring 'TensorFlow'. +The test itself is based on an official multi-worker with Keras tutoral at +https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras +""" + +import reframe as rfm +import reframe.utility.sanity as sn + +from eessi.testsuite import hooks, utils +from eessi.testsuite.constants import * + +@rfm.simple_test +class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest): + + # This test can run at any scale, so parameterize over all known SCALES + scale = parameter(SCALES.keys()) + valid_prog_environs = ['default'] + valid_systems = [] + + # Parameterize over all modules that start with TensorFlow + module_name = parameter(utils.find_modules('TensorFlow')) + + # Make CPU and GPU versions of this test + device_type = parameter(['cpu', 'gpu']) + + executable = 'python tf_test.py' + + time_limit = '30m' + + # This test should be run as part of EESSI CI + tags = {TAGS['CI']} + + @deferrable + def assert_tf_config_ranks(self): + '''Assert that each rank sets a TF_CONFIG''' + n_ranks = sn.count(sn.extractall('^Rank [0-9]+: Set TF_CONFIG for rank (?P[0-9]+)', self.stdout, tag='rank')) + return sn.assert_eq(n_ranks, self.num_tasks) + + @deferrable + def assert_completion(self): + '''Assert that the test ran until completion''' + n_fit_completed = sn.count(sn.extractall('^Rank [0-9]+: Keras fit completed', self.stdout)) + + return sn.all([ + sn.assert_eq(n_fit_completed, self.num_tasks), + ]) + + @deferrable + def assert_convergence(self): + '''Assert that the network learned _something_ during training''' + accuracy=sn.extractsingle('^Final accuracy: (?P\S+)', self.stdout, 'accuracy', float) + # mnist is a 10-class classification problem, so if accuracy >> 0.2 the network 'learned' something + return sn.assert_gt(accuracy, 0.2) + + @sanity_function + def assert_sanity(self): + '''Check all sanity criteria''' + return sn.all([ + self.assert_tf_config_ranks(), + self.assert_completion(), + self.assert_convergence(), + ]) + + @performance_function('img/s') + def perf(self): + return sn.extractsingle(r'^Performance:\s+(?P\S+)', self.stdout, 'perf', float) + + @run_after('init') + def run_after_init(self): + """hooks to run after the init phase""" + hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type) + hooks.set_modules(self) + hooks.set_tag_scale(self) + + @run_after('init') + def set_executable_opts(self): + """Set executable opts based on device_type parameter""" + num_default = 0 # If this test already has executable opts, they must have come from the command line + hooks.check_custom_executable_opts(self, num_default=num_default) + if not self.has_custom_executable_opts: + self.executable_opts += ['--device', self.device_type] + utils.log(f'executable_opts set to {self.executable_opts}') + + @run_after('init') + def set_test_descr(self): + self.descr = f'TensorFlow benchmark on {self.device_type}' + + @run_after('setup') + def run_after_setup(self): + """hooks to run after the setup phase""" + # TODO: implement + # It should bind to socket, but different MPIs may have different arguments to do that... + # We should at very least prevent that it binds to single core per process, as that results in many threads being scheduled to one core + # binding may also differ per launcher used. It'll be hard to support a wide range and still get proper binding + if self.device_type == 'cpu': + hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET']) + elif self.device_type == 'gpu': + hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU']) + else: + raise NotImplementedError(f'Failed to set number of tasks and cpus per task for device {self.device_type}') + + @run_after('setup') + def set_binding_policy(self): + """Set a binding policy""" + hooks.set_compact_process_binding(self) + + @run_after('setup') + def set_thread_count_args(self): + """Set exectuable opts defining the thread count""" + if not self.has_custom_executable_opts: + self.executable_opts += ['--intra-op-parallelism', '%s' % self.num_cpus_per_task] + self.executable_opts += ['--inter-op-parallelism', '1'] + utils.log(f'executable_opts set to {self.executable_opts}') + + @run_after('setup') + def set_binding_policy(self): + """Sets a binding policy for tasks. We don't bind threads because of https://github.com/tensorflow/tensorflow/issues/60843""" + hooks.set_compact_process_binding(self) diff --git a/eessi/testsuite/utils.py b/eessi/testsuite/utils.py index 73ce8725..3b770d77 100644 --- a/eessi/testsuite/utils.py +++ b/eessi/testsuite/utils.py @@ -66,3 +66,42 @@ def find_modules(substr: str) -> Iterator[str]: modules = OrderedSet(ms.available_modules(substr)) for m in modules: yield m + +def check_proc_attribute_defined(test: rfm.RegressionTest, attribute) -> bool: + """ + Checks if a processor feature is defined (i.e. if test.current_partition.processor. is defined) + If not, throws an informative error message. + + Arguments: + - test: the reframe regression test instance for which should be checked if the processor feature is defined + - attribute: attribute of the processor object, as defined by systems.partitions.processor + + Return: + - True (bool) if the attribute is defined + - Function does not return (but raises an error) if the attribute is undefined + + Current known attributes in ReFrame are arch, num_cpus, num_cpus_per_core and topology, + but this may change in the future. + + If ReFrame's autodetect feature is used, all of these should be properly defined, so that's what we advice. + """ + + if test.current_partition: + if getattr(test.current_partition.processor, attribute): + return True + else: + msg = ( + f"Processor information ({attribute}) missing. " + "Check that processor information is either autodetected " + "(see https://reframe-hpc.readthedocs.io/en/stable/configure.html#proc-autodetection), " + "or manually set in the ReFrame configuration file " + "(see https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#processor-info)." + ) + else: + msg = ( + "This test's current_partition is not set yet. " + "The function utils.proc_attribute_defined should only be called after the setup() phase of ReFrame." + "This is a programming error, please report this issue." + ) + raise AttributeError(msg) + \ No newline at end of file