Skip to content

Commit

Permalink
Merge pull request #38 from casparvl/tensorflow
Browse files Browse the repository at this point in the history
Tensorflow
  • Loading branch information
smoors authored Jul 31, 2023
2 parents 465f1e5 + 9e1b5a2 commit 313645f
Show file tree
Hide file tree
Showing 6 changed files with 521 additions and 16 deletions.
8 changes: 8 additions & 0 deletions eessi/testsuite/constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""
Constants for ReFrame tests
"""

AMD = 'AMD'
CI = 'CI'
CPU = 'CPU'
CPU_SOCKET = 'CPU_SOCKET'
GPU = 'GPU'
GPU_VENDOR = 'GPU_VENDOR'
INTEL = 'INTEL'
Expand All @@ -14,6 +16,12 @@
GPU: 'gpu',
}

COMPUTE_UNIT = {
CPU: 'cpu',
CPU_SOCKET: 'cpu_socket',
GPU: 'gpu',
}

TAGS = {
CI: 'CI',
}
Expand Down
148 changes: 132 additions & 16 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,36 @@
import reframe as rfm

from eessi.testsuite.constants import *
from eessi.testsuite.utils import get_max_avail_gpus_per_node, is_cuda_required_module, log

PROCESSOR_INFO_MISSING = '''
This test requires the number of CPUs to be known for the partition it runs on.
Check that processor information is either autodetected
(see https://reframe-hpc.readthedocs.io/en/stable/configure.html#proc-autodetection),
or manually set in the ReFrame configuration file
(see https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#processor-info).
'''

from eessi.testsuite.utils import get_max_avail_gpus_per_node, is_cuda_required_module, log, check_proc_attribute_defined

def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str):
"""
Assign one task per compute unit (DEVICE_TYPES[CPU] or DEVICE_TYPES[GPU]).
Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]).
Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node,
based on the current scale and the current partition’s num_cpus, max_avail_gpus_per_node and num_nodes.
For GPU tests, one task per GPU is set, and num_cpus_per_task is based on the ratio of CPU-cores/GPUs.
For CPU tests, one task per CPU is set, and num_cpus_per_task is set to 1.
Total task count is determined based on the number of nodes to be used in the test.
Behaviour of this function is (usually) sensible for MPI tests.
Arguments:
- test: the ReFrame test to which this hook should apply
- compute_unit: a device as listed in eessi.testsuite.constants.COMPUTE_UNIT
Examples:
On a single node with 2 sockets, 64 cores and 128 hyperthreads:
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task
Future work:
Currently, on a single node with 2 sockets, 64 cores and 128 hyperthreads, this
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU], true) will launch 128 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET], true) will launch 2 tasks with 64 threads per task
In the future, we'd like to add an arugment that disables spawning tasks for hyperthreads.
"""
check_proc_attribute_defined(test, 'num_cpus')
test.max_avail_cpus_per_node = test.current_partition.processor.num_cpus
if test.max_avail_cpus_per_node is None:
raise AttributeError(PROCESSOR_INFO_MISSING)
log(f'max_avail_cpus_per_node set to {test.max_avail_cpus_per_node}')

# Check if either node_part, or default_num_cpus_per_node and default_num_gpus_per_node are set correctly
if not (
Expand Down Expand Up @@ -59,13 +65,68 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str

log(f'default_num_cpus_per_node set to {test.default_num_cpus_per_node}')

if compute_unit == DEVICE_TYPES[GPU]:
if compute_unit == COMPUTE_UNIT[GPU]:
_assign_one_task_per_gpu(test)
elif compute_unit == DEVICE_TYPES[CPU]:
elif compute_unit == COMPUTE_UNIT[CPU]:
_assign_one_task_per_cpu(test)
elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]:
_assign_one_task_per_cpu_socket(test)
else:
raise ValueError(f'compute unit {compute_unit} is currently not supported')

def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
"""
Determines the number of tasks per node by dividing the default_num_cpus_per_node by
the number of cpus available per socket, and rounding up. The result is that for full-node jobs the default
will spawn one task per socket, with a number of cpus per task equal to the number of cpus per socket.
Other examples:
- half a node (i.e. node_part=2) on a 4-socket system would result in 2 tasks per node,
with number of cpus per task equal to the number of cpus per socket.
- 2 cores (i.e. default_num_cpus_per_node=2) on a 16 core system with 2 sockets would result in
1 task per node, with 2 cpus per task
This default is set unless the test is run with:
--setvar num_tasks_per_node=<x> and/or
--setvar num_cpus_per_task=<y>.
In those cases, those take precedence, and the remaining variable (num_cpus_per task or
num_tasks_per_node respectively) is calculated based on the equality
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.
Variables:
- default_num_cpus_per_node: default number of CPUs per node as defined in the test
(e.g. by earlier hooks like set_tag_scale)
Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = math.ceil(test.default_num_cpus_per_node / num_cpus_per_socket)
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

else:
pass # both num_tasks_per_node and num_cpus_per_node are already set

test.num_tasks = test.num_nodes * test.num_tasks_per_node
log(f'Number of tasks per node set to: {test.num_tasks_per_node}')
log(f'Number of cpus per task set to {test.num_cpus_per_task}')
log(f'num_tasks set to {test.num_tasks}')

def _assign_one_task_per_cpu(test: rfm.RegressionTest):
"""
Expand Down Expand Up @@ -239,5 +300,60 @@ def check_custom_executable_opts(test: rfm.RegressionTest, num_default: int = 0)
test.has_custom_executable_opts = False
if len(test.executable_opts) > num_default:
test.has_custom_executable_opts = True

log(f'has_custom_executable_opts set to {test.has_custom_executable_opts}')


def set_compact_process_binding(test: rfm.RegressionTest):
"""
This hook sets a binding policy for process binding.
More specifically, it will bind each process to subsequent domains of test.num_cpus_per_task cores.
A few examples:
- Pure MPI (test.num_cpus_per_task = 1) will result in binding 1 process to each core.
this will happen in a compact way, i.e. rank 0 to core 0, rank 1 to core 1, etc
- Hybrid MPI-OpenMP, e.g. test.num_cpus_per_task = 4 will result in binding 1 process to subsequent sets of 4 cores.
I.e. rank 0 to core 0-3, rank 1 to core 4-7, rank 2 to core 8-11, etc
It is hard to do this in a portable way. Currently supported for process binding are:
- Intel MPI (through I_MPI_PIN_DOMAIN)
- OpenMPI (through OMPI_MCA_rmaps_base_mapping_policy)
- srun (LIMITED SUPPORT: through SLURM_CPU_BIND, but only effective if task/affinity plugin is enabled)
"""

# Check if hyperthreading is enabled. If so, divide the number of cpus per task by the number
# of hw threads per core to get a physical core count
check_proc_attribute_defined(test, 'num_cpus_per_core')
num_cpus_per_core = test.current_partition.processor.num_cpus_per_core
physical_cpus_per_task = int(test.num_cpus_per_task / num_cpus_per_core)

# Do binding for intel and OpenMPI's mpirun, and srun
# Other launchers may or may not do the correct binding
test.env_vars['I_MPI_PIN_CELL'] = 'core' # Don't bind to hyperthreads, only to physcial cores
test.env_vars['I_MPI_PIN_DOMAIN'] = '%s:compact' % physical_cpus_per_task
test.env_vars['OMPI_MCA_rmaps_base_mapping_policy'] = 'node:PE=%s' % physical_cpus_per_task
# Default binding for SLURM. Only effective if the task/affinity plugin is enabled
# and when number of tasks times cpus per task equals either socket, core or thread count
test.env_vars['SLURM_CPU_BIND'] = 'verbose'
log(f'Set environment variable I_MPI_PIN_DOMAIN to {test.env_vars["I_MPI_PIN_DOMAIN"]}')
log(f'Set environment variable OMPI_MCA_rmaps_base_mapping_policy to {test.env_vars["OMPI_MCA_rmaps_base_mapping_policy"]}')
log(f'Set environment variable SLURM_CPU_BIND to {test.env_vars["SLURM_CPU_BIND"]}')


def set_compact_thread_binding(test: rfm.RegressionTest):
"""
This hook sets a binding policy for thread binding.
It sets a number of environment variables to try and set a sensible binding for OPENMP tasks.
Thread binding is supported for:
- GNU OpenMP (through OMP_NUM_THREADS, OMP_PLACES and OMP_PROC_BIND)
- Intel OpenMP (through KMP_AFFINITY)
"""

# Set thread binding
test.env_vars['OMP_PLACES'] = 'cores'
test.env_vars['OMP_PROC_BIND'] = 'close'
# See https://www.intel.com/content/www/us/en/docs/cpp-compiler/developer-guide-reference/2021-8/thread-affinity-interface.html
test.env_vars['KMP_AFFINITY'] = 'granularity=fine,compact,1,0'
log(f'Set environment variable OMP_PLACES to {test.env_vars["OMP_PLACES"]}')
log(f'Set environment variable OMP_PROC_BIND to {test.env_vars["OMP_PROC_BIND"]}')
log(f'Set environment variable KMP_AFFINITY to {test.env_vars["KMP_AFFINITY"]}')
31 changes: 31 additions & 0 deletions eessi/testsuite/tests/apps/tensorflow/src/mnist_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size, test_batch_size):
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
x_test = x_test / np.float32(255)
y_test = y_test.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices(
(x_test, y_test)).batch(test_batch_size)
return train_dataset, test_dataset

def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=['accuracy'])
return model
Loading

0 comments on commit 313645f

Please sign in to comment.